Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
rkhachatryan commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633189008 ## docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html: ## Review Comment: I guess this file is a leftover and should be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
rkhachatryan commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633189008 ## docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html: ## Review Comment: I guess a leftover and should be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]
ashangit commented on PR #833: URL: https://github.com/apache/flink-kubernetes-operator/pull/833#issuecomment-2158258410 Updated the PR to take in account this [comment](https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17853606&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17853606) in the jira ticket -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]
gaborgsomogyi merged PR #24891: URL: https://github.com/apache/flink/pull/24891 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35525][yarn] Add a token services configuration to allow obtained token to be passed to Yarn AM [flink]
gaborgsomogyi commented on PR #24891: URL: https://github.com/apache/flink/pull/24891#issuecomment-2158262913 @wForget thanks for your efforts! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-XXX][planner] Support custom shuffle for lookup join [flink]
WencongLiu opened a new pull request, #24920: URL: https://github.com/apache/flink/pull/24920 ## What is the purpose of the change _Support custom shuffle for lookup join_ ## Verifying this change UT and ITs. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-XXX][planner] Support custom shuffle for lookup join [flink]
flinkbot commented on PR #24920: URL: https://github.com/apache/flink/pull/24920#issuecomment-2158287262 ## CI report: * 97c210a011dd34e0c19deffe746cb3e88b355ce2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35528][task] Skip execution of interruptible mails when yielding [flink]
pnowojski commented on code in PR #24904: URL: https://github.com/apache/flink/pull/24904#discussion_r1633225150 ## docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html: ## Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
gaborgsomogyi commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1633243846 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { +uncompactedName = uncompactedName.substring(1); +} +return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); Review Comment: No strong opinion just asking. Compaction/compression normally adds postfix, why prefix here? ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { Review Comment: Are we calling this with `..`? ## flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java: ## @@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter @Nullable final Path targetPath; -private CompactingFileWriter.Type writeType = null; +private Type writeType = null; Review Comment: When we remove the `CompactingFileWriter` prefix then `Type` itself is hard to read. I suggest either put back the prefix or rename it to something more meaningful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
hlteoh37 commented on code in PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#discussion_r1633262253 ## flink-connector-aws/flink-connector-kinesis/pom.xml: ## @@ -354,6 +354,12 @@ under the License. + + org.apache.flink.connector.aws.sink + + org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink + Review Comment: Ok sounds good - we discussed offline and suggested changing the shading in `flink-connector-kinesis` to include the whole `flink-connector-aws-base` module instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
z3d1k commented on code in PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#discussion_r1633275869 ## flink-connector-aws/flink-connector-kinesis/pom.xml: ## @@ -354,6 +354,12 @@ under the License. + + org.apache.flink.connector.aws.sink + + org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.sink + Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
ferenc-csaky commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1633282273 ## flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java: ## @@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter @Nullable final Path targetPath; -private CompactingFileWriter.Type writeType = null; +private Type writeType = null; Review Comment: It was used in a mixed manner in the file, in some places `Type`, some other places `CompactingFileWriter.Type`. The class itself is (indirectly) implementing the `CompactingFileWriter` so IMO `Type` and the variable name is definitive enough, but TBH I do not have a strong opinion, personally I prefer shorter code if I have a choice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]
lincoln-lil commented on code in PR #24889: URL: https://github.com/apache/flink/pull/24889#discussion_r164828 ## docs/layouts/shortcodes/generated/optimizer_config_configuration.html: ## @@ -65,6 +71,12 @@ Enum When it is `TRY_RESOLVE`, the optimizer tries to resolve the correctness issue caused by 'Non-Deterministic Updates' (NDU) in a changelog pipeline. Changelog may contain kinds of message types: Insert (I), Delete (D), Update_Before (UB), Update_After (UA). There's no NDU problem in an insert only changelog pipeline. For updates, there are three main NDU problems:1. Non-deterministic functions, include scalar, table, aggregate functions, both builtin and custom ones.2. LookupJoin on an evolving source3. Cdc-source carries metadata fields which are system columns, not belongs to the entity data itself.For the first step, the optimizer automatically enables the materialization for No.2(LookupJoin) if needed, and gives the detailed error message for No.1(Non-deterministic functions) and No.3(Cdc-source with metadata) which is relatively easier to solve by changing the SQL.Default value is `IGNORE`, the optimizer does no changes. Possible values:"TRY_RESOLVE""IGNORE" + + table.optimizer.reuse-optimize-block-with-digest-enabled Review Comment: Add 'Batch & Streaming' label. ## docs/layouts/shortcodes/generated/optimizer_config_configuration.html: ## @@ -125,5 +125,11 @@ Boolean If set to true, it will merge projects when converting SqlNode to RelNode.Note: it is not recommended to turn on unless you are aware of possible side effects, such as causing the output of certain non-deterministic expressions to not meet expectations(see FLINK-20887). + +table.optimizer.union-all-as-breakpoint-enabled Streaming Review Comment: Should be Batch & Streaming, see `BatchCommonSubGraphBasedOptimizer#doOptimize` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
dannycranmer commented on PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#issuecomment-2158665814 There is a risk this will break user code depending on these libs, right? Given this is a bug, and they should not be using the shaded libs, I think this is ok, thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35435] Add timeout Configuration to Async Sink [flink]
dannycranmer merged PR #24839: URL: https://github.com/apache/flink/pull/24839 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
dannycranmer commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1633458080 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DefaultDynamoDbElementConverter.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +/** + * Default implementation of {@link ElementConverter} that lazily falls back to {@link + * DynamoDbTypeInformedElementConverter}. + */ +@PublicEvolving +public class DefaultDynamoDbElementConverter +implements ElementConverter { + +private ElementConverter elementConverter; + +public DefaultDynamoDbElementConverter() {} + +@Override +public DynamoDbWriteRequest apply(T t, SinkWriter.Context context) { +if (elementConverter == null) { +TypeInformation typeInfo = (TypeInformation) TypeInformation.of(t.getClass()); +if (!(typeInfo instanceof CompositeType)) { +throw new IllegalArgumentException("The input type must be a CompositeType."); +} + +elementConverter = +new DynamoDbTypeInformedElementConverter<>((CompositeType) typeInfo); +} + +return elementConverter.apply(t, context); +} + +@Override +public void open(Sink.InitContext context) {} Review Comment: nit: Do we need this? I think it has a no-op default implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
dannycranmer commented on PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#issuecomment-2158683163 @hlteoh37 / @z3d1k can you please take a look at this one too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
z3d1k commented on PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142#issuecomment-2158839029 > There is a risk this will break user code depending on these libs, right? Given this is a bug, and they should not be using the shaded libs, I think this is ok, thoughts? Classes that we add to relocation are specific for connector implementation. Users not only should not generally use shaded libs, they also should not use internal connector classes that are being affected by this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1633781977 ## README.md: ## @@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework with powerful stream- Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) +## Modules + +This repository contains the following modules + +* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector implementation; supports optional request signer +* [Sample application](./example-datastream-job): Sample application showing the usage of the connector with DataStream API. It also demonstrates how to configure the request signer. Review Comment: I moved the example application under tests, as suggested, merging everything in a single class. The usage of the AMP connector is commented out, being a separate dependency, but still present to show the usage, in case someone wants to use it. https://github.com/apache/flink-connector-prometheus/blob/5afb890aa8fe860ce25bcf89f7cdee93a6d2c4be/flink-connector-prometheus/src/test/java/org/apache/flink/connector/prometheus/sink/examples/DataStreamExample.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-2159216321 I moved the example application under tests and removed the separate module completely. I added links to the example application in the README. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34470][Connectors/Kafka] Fix indefinite blocking by adjusting stopping condition in split reader [flink-connector-kafka]
morazow commented on PR #100: URL: https://github.com/apache/flink-connector-kafka/pull/100#issuecomment-2159296755 Thanks @dongwoo6kim , Tests looks good from my side ๐ (_Recently I faced similar issue which maybe related, when running batch mode with setting `startingOffsets`. The change should solve that issue. But we may create issue for it_) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1633910473 ## docs/layouts/shortcodes/generated/security_configuration.html: ## @@ -134,6 +134,12 @@ String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). + +security.ssl.internal.keystore-type +"pkcs12" Review Comment: You're right. Since I generated the documentation with Java 11, it output PKCS12. Do you know how I can stop the default value in the documentation from being generated and instead add it manually? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634004521 ## docs/layouts/shortcodes/generated/security_configuration.html: ## @@ -134,6 +134,12 @@ String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). + +security.ssl.internal.keystore-type +"pkcs12" Review Comment: I found an `OverrideDefault` annotation, will try that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]
1996fanrui commented on PR #833: URL: https://github.com/apache/flink-kubernetes-operator/pull/833#issuecomment-2159610745 Thanks @ashangit for the fix and everyone for the review! Merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35489][Autoscaler] Ensure METASPACE size is computed before the heap [flink-kubernetes-operator]
1996fanrui merged PR #833: URL: https://github.com/apache/flink-kubernetes-operator/pull/833 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
1996fanrui commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634051534 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: This PR has not been updated for almost a month, and the author has not responded. The flink kubernetes operator will be released this week, so I will take over this PR if it's still not updated or responded within 24 hours. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]
reswqa commented on code in PR #24845: URL: https://github.com/apache/flink/pull/24845#discussion_r1634061063 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializer for {@link java.util.Set}. The serializer relies on an element serializer for the + * serialization of the set's elements. + * + * The serialization format for the set is as follows: four bytes for the length of the set, + * followed by the serialized representation of each element. + * + * @param The type of element in the set. + */ +@Internal +public final class SetSerializer extends TypeSerializer> { + +private static final long serialVersionUID = 1L; + +/** The serializer for the elements of the set. */ +private final TypeSerializer elementSerializer; + +/** + * Creates a set serializer that uses the given serializer to serialize the set's elements. + * + * @param elementSerializer The serializer for the elements of the set + */ +public SetSerializer(TypeSerializer elementSerializer) { +this.elementSerializer = checkNotNull(elementSerializer); +} + +// +// SetSerializer specific properties +// + +/** + * Gets the serializer for the elements of the set. + * + * @return The serializer for the elements of the set + */ +public TypeSerializer getElementSerializer() { +return elementSerializer; +} + +// +// Type Serializer implementation +// + +@Override +public boolean isImmutableType() { +return false; +} + +@Override +public TypeSerializer> duplicate() { +TypeSerializer duplicateElement = elementSerializer.duplicate(); +return duplicateElement == elementSerializer ? this : new SetSerializer<>(duplicateElement); +} + +@Override +public Set createInstance() { +return new HashSet<>(0); +} + +@Override +public Set copy(Set from) { +Set newSet = new HashSet<>(from.size()); +for (T element : from) { +newSet.add(elementSerializer.copy(element)); +} +return newSet; +} + +@Override +public Set copy(Set from, Set reuse) { +return copy(from); +} + +@Override +public int getLength() { +return -1; // var length +} + +@Override +public void serialize(Set set, DataOutputView target) throws IOException { +final int size = set.size(); +target.writeInt(size); +for (T element : set) { +elementSerializer.serialize(element, target); Review Comment: > I also found that the serializing a list with null values will throw NPE as we reused the existing ListSerializer impl, which does not allow null values (it is only used for serializing ListState before where null value is explicitly forbidden in the contract). Directly adding null marker for it will break state compatibility, I plan to introduce a new list serializer that accepts null values for serializing user objects in a new JIRA, WDYT Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, ple
Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]
reswqa commented on code in PR #24845: URL: https://github.com/apache/flink/pull/24845#discussion_r1634076649 ## flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SetSerializer.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializer for {@link java.util.Set}. The serializer relies on an element serializer for the + * serialization of the set's elements. + * + * The serialization format for the set is as follows: four bytes for the length of the set, + * followed by the serialized representation of each element. + * + * @param The type of element in the set. + */ +@Internal +public final class SetSerializer extends TypeSerializer> { + +private static final long serialVersionUID = 1L; + +/** The serializer for the elements of the set. */ +private final TypeSerializer elementSerializer; + +/** + * Creates a set serializer that uses the given serializer to serialize the set's elements. + * + * @param elementSerializer The serializer for the elements of the set + */ +public SetSerializer(TypeSerializer elementSerializer) { +this.elementSerializer = checkNotNull(elementSerializer); +} + +// +// SetSerializer specific properties +// + +/** + * Gets the serializer for the elements of the set. + * + * @return The serializer for the elements of the set + */ +public TypeSerializer getElementSerializer() { +return elementSerializer; +} + +// +// Type Serializer implementation +// + +@Override +public boolean isImmutableType() { +return false; +} + +@Override +public TypeSerializer> duplicate() { +TypeSerializer duplicateElement = elementSerializer.duplicate(); +return duplicateElement == elementSerializer ? this : new SetSerializer<>(duplicateElement); +} + +@Override +public Set createInstance() { +return new HashSet<>(0); +} + +@Override +public Set copy(Set from) { +Set newSet = new HashSet<>(from.size()); +for (T element : from) { +newSet.add(elementSerializer.copy(element)); +} +return newSet; +} + +@Override +public Set copy(Set from, Set reuse) { +return copy(from); +} + +@Override +public int getLength() { +return -1; // var length +} + +@Override +public void serialize(Set set, DataOutputView target) throws IOException { +final int size = set.size(); +target.writeInt(size); +for (T element : set) { +elementSerializer.serialize(element, target); Review Comment: > I also found that the serializing a list with null values will throw NPE as we reused the existing ListSerializer impl, which does not allow null values (it is only used for serializing ListState before where null value is explicitly forbidden in the contract). Directly adding null marker for it will break state compatibility, I plan to introduce a new list serializer that accepts null values for serializing user objects in a new JIRA, WDYT Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, ple
Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
gong commented on PR #3207: URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2159656572 > @gong would you like to open a PR for release-3.1 branch? ok, I will open the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
gong commented on PR #3407: URL: https://github.com/apache/flink-cdc/pull/3407#issuecomment-2159661569 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
wenbingshen commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634091321 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: Sorry, I will update this PR today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
cxzl25 commented on PR #24896: URL: https://github.com/apache/flink/pull/24896#issuecomment-2159691770 Gentle ping @xintongsong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
wenbingshen commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634118107 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: @gyfora @1996fanrui I think the existing matching logic is not too complicated, and the PR is not too large. If the maximum parallelism of other source components is expanded in the future, it will not be too late to split it; If we want to split the matching logic of Kafka and Pulsar, we may need to perform two regular matching on the source metrics data set to obtain the number of partitions of these two queues separately, which does not seem to be necessary; It seems that there is no simple or matching. The current regular expression is or, and it is also distinguished in the map function. Maybe you have a better answer to tell me, thank you. I suggest keeping the logic of the existing PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32081][checkpoint] Compatibility between file-merging on and off across job runs [flink]
ljz2051 commented on code in PR #24873: URL: https://github.com/apache/flink/pull/24873#discussion_r1634123958 ## flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java: ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.execution.RestoreMode; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; + +import static org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * FileMerging Compatibility IT case which tests recovery from a checkpoint created in different + * fileMerging mode (i.e. fileMerging enabled/disabled). + */ +public class SnapshotFileMergingCompatibilityITCase extends TestLogger { + +public static Collection parameters() { +return Arrays.asList( +new Object[][] { +{RestoreMode.CLAIM, true}, +{RestoreMode.CLAIM, false}, +{RestoreMode.NO_CLAIM, true}, +{RestoreMode.NO_CLAIM, false} +}); +} + +@ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = {1}") +@MethodSource("parameters") +public void testSwitchFromDisablingToEnablingFileMerging( +RestoreMode restoreMode, boolean fileMergingAcrossBoundary, @TempDir Path checkpointDir) +throws Exception { +testSwitchingFileMerging( +checkpointDir, false, true, restoreMode, fileMergingAcrossBoundary); +} + +@ParameterizedTest(name = "RestoreMode = {0}, fileMergingAcrossBoundary = {1}") +@MethodSource("parameters") +public void testSwitchFromEnablingToDisablingFileMerging( +RestoreMode restoreMode, boolean fileMergingAcrossBoundary, @TempDir Path checkpointDir) +throws Exception { +testSwitchingFileMerging( +checkpointDir, true, false, restoreMode, fileMergingAcrossBoundary); +} + +private void testSwitchingFileMerging( +Path checkpointDir, +boolean firstFileMergingSwitch, +boolean secondFileMergingSwitch, +RestoreMode restoreMode, +boolean fileMergingAcrossBoundary) +throws Exception { +final Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toUri().toString()); +config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); +config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, fileMergingAcrossBoundary); +config.set(CheckpointingOptions.FILE_MERGING_ENABLED, firstFileMergingSwitch); +MiniClusterWithClientResource firstCluster = +new MiniClusterWithClientResource( +new MiniClusterResourceConfiguration.Builder() +.setConfiguration(config) +.setNumberTaskManagers(2) +.setNumberSlotsPerTaskManager(2) +.build()); +EmbeddedRocksDBStateBackend stateBackend1 = new EmbeddedRocksDBStateBackend(); +stateBackend1.configure(config, Thread.currentThread().getContextClassLoader()); +firstCluster.before(); +String externalCheckpoint; +try { +externalCheckpoint = +runJobAndGetExternalizedCheckpoint( +stateBackend1, null, firstCluster, restoreM
Re: [PR] [FLINK-35467][cdc-dist][bin] Respect externally set FLINK_CONF_DIR for CDC task configuration. [flink-cdc]
yuxiqian commented on code in PR #3398: URL: https://github.com/apache/flink-cdc/pull/3398#discussion_r1634142791 ## flink-cdc-dist/src/main/flink-cdc-bin/bin/flink-cdc.sh: ## @@ -34,11 +34,11 @@ if [[ -z $FLINK_HOME ]]; then exit 1 fi -# Setup Flink related configurations -# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it -_FLINK_HOME_DETERMINED=1 Review Comment: Should `_FLINK_HOME_DETERMINED` env var be preserved here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634153986 ## flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java: ## @@ -306,6 +341,19 @@ void testInternalSSLWrongKeyPassword(String sslProvider) { .isInstanceOf(Exception.class); } +@ParameterizedTest +@MethodSource("parameters") +void testInternalSSLWrongKeystoreType(String sslProvider) { +final Configuration config = createInternalSslConfigWithKeyAndTrustStores(sslProvider); +config.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE, "JCEKS"); + +assertThatThrownBy(() -> SSLUtils.createInternalServerSSLEngineFactory(config)) +.isInstanceOf(java.io.IOException.class); Review Comment: Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634154163 ## flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java: ## @@ -148,6 +148,17 @@ void testRESTClientSSLWrongPassword(String sslProvider) { .isInstanceOf(Exception.class); } +/** Tests that REST Client SSL Engine creation fails with bad SSL configuration. */ +@ParameterizedTest +@MethodSource("parameters") +void testRESTClientSSLBadTruststoreType(String sslProvider) { +Configuration clientConfig = createRestSslConfigWithTrustStore(sslProvider); +clientConfig.set(SecurityOptions.SSL_REST_TRUSTSTORE_TYPE, "BKS"); Review Comment: Changed the tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634155084 ## flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java: ## @@ -59,13 +71,38 @@ public TrustManager[] trustManagers() { .fingerprints(sslCertFingerprints) .build(); -trustManagerFactory.init(loadKeystore(sslTrustStore, sslTrustStorePassword)); +trustManagerFactory.init( +loadKeystore(sslTrustStore, sslTrustStorePassword, sslTrustStoreType)); return trustManagerFactory.getTrustManagers(); -} catch (GeneralSecurityException e) { +} catch (GeneralSecurityException | IOException e) { // replicate exception handling from SSLEngineProvider throw new RemoteTransportException( "Server SSL connection could not be established because SSL context could not be constructed", e); } } + +@Override +public KeyStore loadKeystore(String filename, String password) { +try { +return loadKeystore(filename, password, sslKeyStoreType); +} catch (IOException +| CertificateException +| NoSuchAlgorithmException +| KeyStoreException e) { +throw new RemoteTransportException( Review Comment: I simplified it a bit to catch GeneralSecurityException, but I think that's the most we can do. The Scala base class doesn't throw any exceptions, so we have to catch it in the Java override. ## docs/layouts/shortcodes/generated/security_configuration.html: ## @@ -134,6 +134,12 @@ String The secret to decrypt the keystore file for Flink's for Flink's internal endpoints (rpc, data transport, blob server). + +security.ssl.internal.keystore-type +"pkcs12" Review Comment: updated documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master commented on PR #24919: URL: https://github.com/apache/flink/pull/24919#issuecomment-2159778519 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
liuyongvs commented on PR #24773: URL: https://github.com/apache/flink/pull/24773#issuecomment-2159808362 hi @snuyanzin @superdiaodiao do we need supports encoding args ? db2 https://www.ibm.com/docs/en/db2-for-zos/12?topic=functions-urlencode-urldecode max compute https://www.alibabacloud.com/help/en/maxcompute/user-guide/url-decode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
xintongsong commented on PR #24896: URL: https://github.com/apache/flink/pull/24896#issuecomment-2159816793 > Thanks, I've updated. The commit message is not yet update. Have you forgot to push the changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on PR #24773: URL: https://github.com/apache/flink/pull/24773#issuecomment-2159830832 > hi @snuyanzin @superdiaodiao do we need supports encoding args ? > db2 https://www.ibm.com/docs/en/db2-for-zos/12?topic=functions-urlencode-urldecode > max compute https://www.alibabacloud.com/help/en/maxcompute/user-guide/url-decode > Calcite, Spark also need only one arg, it is enough to handle cases and UTF-8 can support all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
cxzl25 commented on PR #24896: URL: https://github.com/apache/flink/pull/24896#issuecomment-2159856148 > The commit message is not yet update Sorry, it's updated now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
ramkrish86 commented on code in PR #24896: URL: https://github.com/apache/flink/pull/24896#discussion_r1634237585 ## flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java: ## @@ -59,7 +59,7 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public void flush() throws IOException { -out.hsync(); +out.hflush(); Review Comment: I checked this change. In ABFS case hflush and hsync internally does the same I believe. So that functionality might not break. In HDFS case yes this might make things costly. Thanks for fixing this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35533][runtime] Support Flink hybrid shuffle integration with Apache Celeborn [flink]
reswqa commented on code in PR #24900: URL: https://github.com/apache/flink/pull/24900#discussion_r1634238153 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ## @@ -148,12 +162,52 @@ public void close() throws IOException { // private List createTierConsumerAgents( -List tieredStorageConsumerSpecs) { +List tieredStorageConsumerSpecs, +List> shuffleDescriptors) { ArrayList tierConsumerAgents = new ArrayList<>(); + +List> transformedTierShuffleDescriptors = +transformTierShuffleDescriptors(shuffleDescriptors); +// Each tier only requires one inner list of transformedTierShuffleDescriptors, so the size +// of transformedTierShuffleDescriptors and the size of tierFactories are the same. +checkState(transformedTierShuffleDescriptors.size() == tierFactories.size()); +int index = 0; Review Comment: Why not use `for-i` loop if we do need the iteration index? ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java: ## @@ -148,12 +162,52 @@ public void close() throws IOException { // private List createTierConsumerAgents( -List tieredStorageConsumerSpecs) { +List tieredStorageConsumerSpecs, +List> shuffleDescriptors) { ArrayList tierConsumerAgents = new ArrayList<>(); + +List> transformedTierShuffleDescriptors = +transformTierShuffleDescriptors(shuffleDescriptors); +// Each tier only requires one inner list of transformedTierShuffleDescriptors, so the size +// of transformedTierShuffleDescriptors and the size of tierFactories are the same. +checkState(transformedTierShuffleDescriptors.size() == tierFactories.size()); +int index = 0; for (TierFactory tierFactory : tierFactories) { tierConsumerAgents.add( - tierFactory.createConsumerAgent(tieredStorageConsumerSpecs, nettyService)); +tierFactory.createConsumerAgent( +tieredStorageConsumerSpecs, +transformedTierShuffleDescriptors.get(index++), +nettyService)); } return tierConsumerAgents; } + +/** + * Before transforming the shuffle descriptors, the number of tier shuffle descriptors is + * numPartitions * numTiers (That means shuffleDescriptors.size() is numPartitions, while the + * shuffleDescriptors.get(0).size() is numTiers). After transforming, the number of tier shuffle + * descriptors is numTiers * numPartitions (That means transformedList.size() is numTiers, while + * transformedList.get(0).size() is numPartitions). + */ +private static List> transformTierShuffleDescriptors( +List> shuffleDescriptors) { +int numTiers = 0; +int numDescriptors = shuffleDescriptors.size(); Review Comment: Can be `numPartitions` according to the java doc. ## flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java: ## @@ -490,6 +490,24 @@ public enum CompressionCodec { + " is configured. The new mode is currently in an experimental phase. It can be set to false to fallback to the legacy mode " + " if something unexpected. Once the new mode reaches a stable state, the legacy mode as well as the option will be removed."); +/** The option to configure the tiered factory creator remote class name for hybrid shuffle. */ +@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) +@Experimental +public static final ConfigOption +NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME = + key("taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class") +.stringType() +.noDefaultValue() +.withDescription( +"The option configures the class that is responsible for creating an " ++ "external remote tier factory for hybrid shuffle. Note that " ++ "only Celeborn can be accepted as the remote shuffle tier " Review Comment: only Celeborn -> only Apache Celeborn How do we make sure that only Apache Celeborn is the valid option? I didn't notice if we have the corresponding validation. ## flink-runtime/src/test/jav
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
gyfora commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634247827 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: fair enough, we can keep it like this for now but if we add more similar logic then we need to split it up :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
gyfora merged PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]
X-czh commented on PR #24845: URL: https://github.com/apache/flink/pull/24845#issuecomment-2159873214 rebased -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35052] Reject unsupported versions in the webhook validator [flink-kubernetes-operator]
gyfora merged PR #831: URL: https://github.com/apache/flink-kubernetes-operator/pull/831 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding affinity rule. [flink-kubernetes-operator]
gyfora closed pull request #737: Adding affinity rule. URL: https://github.com/apache/flink-kubernetes-operator/pull/737 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding affinity rule. [flink-kubernetes-operator]
gyfora commented on PR #737: URL: https://github.com/apache/flink-kubernetes-operator/pull/737#issuecomment-2159881613 I am closing this PR due to inactivity, please re-open if you pick it up again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
wenbingshen commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1634261181 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( +v -> { +Matcher matcher = partitionRegex.matcher(v); +if (matcher.matches()) { +String kafkaTopic = matcher.group("kafkaTopic"); +String kafkaId = matcher.group("kafkaId"); +String pulsarTopic = + matcher.group("pulsarTopic"); +String pulsarId = matcher.group("pulsarId"); +return kafkaTopic != null +? kafkaTopic + "-" + kafkaId +: pulsarTopic + "-" + pulsarId; Review Comment: Sure. Thanks! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
xintongsong commented on PR #24896: URL: https://github.com/apache/flink/pull/24896#issuecomment-2159897218 Thanks, @cxzl25. Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream [flink]
xintongsong closed pull request #24896: [FLINK-35531] Avoid calling hsync in flush method in BaseHadoopFsRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/24896 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] FLINK-35402 [flink-operator][Deployment] add labels to metadata [flink-kubernetes-operator]
gyfora merged PR #829: URL: https://github.com/apache/flink-kubernetes-operator/pull/829 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]
Zakelly commented on code in PR #24895: URL: https://github.com/apache/flink/pull/24895#discussion_r1634298332 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java: ## @@ -307,18 +307,37 @@ void onProcessingTime(long time) throws Exception { } public void advanceWatermark(long time) throws Exception { -currentWatermark = time; +Preconditions.checkState( +tryAdvanceWatermark( +time, +() -> { +// Never stop advancing. +return false; +})); +} +/** + * @return true if following watermarks can be processed immediately. False if the firing timers + * should be interrupted as soon as possible. + */ +public boolean tryAdvanceWatermark( +long time, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) +throws Exception { +currentWatermark = time; InternalTimer timer; - +boolean stop = cancellationContext.isCancelled(); while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time -&& !cancellationContext.isCancelled()) { +&& !stop) { keyContext.setCurrentKey(timer.getKey()); eventTimeTimersQueue.poll(); triggerTarget.onEventTime(timer); taskIOMetricGroup.getNumFiredTimers().inc(); +// Check if we should stop advancing after at least one iteration to guarantee progress +// and prevent a potential starvation. +stop = cancellationContext.isCancelled() || shouldStopAdvancingFn.test(); } +return true; Review Comment: Should be `return stop` ? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java: ## @@ -307,18 +307,37 @@ void onProcessingTime(long time) throws Exception { } public void advanceWatermark(long time) throws Exception { -currentWatermark = time; +Preconditions.checkState( +tryAdvanceWatermark( +time, +() -> { +// Never stop advancing. +return false; +})); +} +/** + * @return true if following watermarks can be processed immediately. False if the firing timers + * should be interrupted as soon as possible. + */ +public boolean tryAdvanceWatermark( +long time, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) +throws Exception { +currentWatermark = time; InternalTimer timer; - +boolean stop = cancellationContext.isCancelled(); while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time -&& !cancellationContext.isCancelled()) { +&& !stop) { keyContext.setCurrentKey(timer.getKey()); eventTimeTimersQueue.poll(); triggerTarget.onEventTime(timer); taskIOMetricGroup.getNumFiredTimers().inc(); +// Check if we should stop advancing after at least one iteration to guarantee progress +// and prevent a potential starvation. +stop = cancellationContext.isCancelled() || shouldStopAdvancingFn.test(); } +return true; Review Comment: Should be `return !stop` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]
pnowojski commented on code in PR #24895: URL: https://github.com/apache/flink/pull/24895#discussion_r1634312543 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java: ## @@ -307,18 +307,37 @@ void onProcessingTime(long time) throws Exception { } public void advanceWatermark(long time) throws Exception { -currentWatermark = time; +Preconditions.checkState( +tryAdvanceWatermark( +time, +() -> { +// Never stop advancing. +return false; +})); +} +/** + * @return true if following watermarks can be processed immediately. False if the firing timers + * should be interrupted as soon as possible. + */ +public boolean tryAdvanceWatermark( +long time, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) +throws Exception { +currentWatermark = time; InternalTimer timer; - +boolean stop = cancellationContext.isCancelled(); while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time -&& !cancellationContext.isCancelled()) { +&& !stop) { keyContext.setCurrentKey(timer.getKey()); eventTimeTimersQueue.poll(); triggerTarget.onEventTime(timer); taskIOMetricGroup.getNumFiredTimers().inc(); +// Check if we should stop advancing after at least one iteration to guarantee progress +// and prevent a potential starvation. +stop = cancellationContext.isCancelled() || shouldStopAdvancingFn.test(); } +return true; Review Comment: That explains failing test :) Thanks for spotting it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][cdc-connector][mysql] skip SchemaChangeEvents that were not included in capturedTableFilter. [flink-cdc]
steveGuRen commented on PR #2986: URL: https://github.com/apache/flink-cdc/pull/2986#issuecomment-2159992536 @joyCurry30 @leonardBang Logically, it is considered a bug when I have to read all tables in the schema, and the first execution of "alter table sql" is not triggered. However, since TableDiscoveryUtils loads all tables, it works. But in a SaaS environment, with so many tables(five hundred thousand, a tenant has their own tables), this will cause issues. Because we can't load all tables when have to discover the new Table with query sql by "show create sql " for every table. So I fix the TableDiscoveryUtils so that just load a table in start or ddl change, and shouEmit logic updates too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][cdc-connector][mysql] skip SchemaChangeEvents that were not included in capturedTableFilter. [flink-cdc]
steveGuRen commented on PR #2986: URL: https://github.com/apache/flink-cdc/pull/2986#issuecomment-216384 In the design of TiDB, I find that they take into account the scenario of having a large number of tables in a SaaS environment. Therefore, special attention is given to the performance of DDL operations in such contexts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
gaborgsomogyi commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1634351465 ## flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java: ## @@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter @Nullable final Path targetPath; -private CompactingFileWriter.Type writeType = null; +private Type writeType = null; Review Comment: Reading the code and seeing `Type` purely requires some digging. Changing the name to `WriterType` would be unnecessary duplication so I would vote on have prefixed from where it comes from. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Flink 1.19.1 release [flink-web]
leonardBang commented on code in PR #745: URL: https://github.com/apache/flink-web/pull/745#discussion_r1634358337 ## docs/content/posts/2024-06-14-release-1.19.1.md: ## @@ -0,0 +1,128 @@ +--- +title: "Apache Flink 1.19.1 Release Announcement" +date: "2024-06-14T00:00:00.000Z" +aliases: +- /news/2024/06/14/release-1.19.1.html Review Comment: tips: please update the date when fire the release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]
hlteoh37 merged PR #142: URL: https://github.com/apache/flink-connector-aws/pull/142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]
reswqa merged PR #24845: URL: https://github.com/apache/flink/pull/24845 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35282][FLINK-35520] PyFlink Support for Apache Beam > 2.49 [flink]
hlteoh37 commented on code in PR #24908: URL: https://github.com/apache/flink/pull/24908#discussion_r1634474656 ## docs/layouts/shortcodes/generated/python_configuration.html: ## @@ -24,7 +24,7 @@ python.executable "python" String -Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0), Pip (version >= 20.3) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec". + Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.52.0), Pip (version >= 20.3) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec". Review Comment: Verified that Beam >= 2.48 still supports Python 3.8+ ## pom.xml: ## @@ -160,7 +160,7 @@ under the License. 1.3 3.23.1 0.10.9.7 - 2.43.0 Review Comment: Could we update this to 2.54? This mitigates additional CVEs from netty that we pull in via Beam https://github.com/apache/beam/issues/29861 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32081][checkpoint] Compatibility between file-merging on and off across job runs [flink]
Zakelly closed pull request #24873: [FLINK-32081][checkpoint] Compatibility between file-merging on and off across job runs URL: https://github.com/apache/flink/pull/24873 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35540][cdc-common][cdc-connector][mysql] fix lost table when database and table are with the same name [flink-cdc]
leonardBang merged PR #3396: URL: https://github.com/apache/flink-cdc/pull/3396 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]
GOODBOY008 opened a new pull request, #3408: URL: https://github.com/apache/flink-cdc/pull/3408 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]
GOODBOY008 commented on PR #3408: URL: https://github.com/apache/flink-cdc/pull/3408#issuecomment-2160480202 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]
leonardBang merged PR #3408: URL: https://github.com/apache/flink-cdc/pull/3408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]
leonardBang merged PR #3407: URL: https://github.com/apache/flink-cdc/pull/3407 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]
morazow commented on PR #24426: URL: https://github.com/apache/flink/pull/24426#issuecomment-2160556166 Thanks @HuangXingBo for the update! I have changed the GitHub actions to run the `auditwheel` repair after installing `patchelf` in Linux. Could you please have another look? You can find the build wheel artifacts for this change here: https://github.com/morazow/flink/actions/runs/9464510932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
ferenc-csaky commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1634722169 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { +uncompactedName = uncompactedName.substring(1); +} +return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); Review Comment: I did not touched this method, I just moved the getWriterType above this to keep the method visibility order in the file intact, because that method became package-private. ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { +uncompactedName = uncompactedName.substring(1); +} +return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); Review Comment: I did not touch this method, I just moved the getWriterType above this to keep the method visibility order in the file intact, because that method became package-private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
ferenc-csaky commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { Review Comment: I did not touched this method, I just moved the `getWriterType` above this to keep the method visibility order in the file intact, because that method became package-private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]
ferenc-csaky commented on code in PR #24914: URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788 ## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java: ## @@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest request) throws IOExceptio return compactingFiles; } -private static Path assembleCompactedFilePath(Path uncompactedPath) { -String uncompactedName = uncompactedPath.getName(); -if (uncompactedName.startsWith(".")) { -uncompactedName = uncompactedName.substring(1); -} -return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); -} - -private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { +@VisibleForTesting +static Type getWriterType(FileCompactor fileCompactor) { if (fileCompactor instanceof OutputStreamBasedFileCompactor) { -return CompactingFileWriter.Type.OUTPUT_STREAM; +return fileCompactor instanceof ConcatFileCompactor +&& ((ConcatFileCompactor) fileCompactor).isCompressed() +? Type.COMPRESSED_STREAM +: Type.OUTPUT_STREAM; } else if (fileCompactor instanceof RecordWiseFileCompactor) { -return CompactingFileWriter.Type.RECORD_WISE; +return Type.RECORD_WISE; } else { throw new UnsupportedOperationException( "Unable to crate compacting file writer for compactor:" + fileCompactor.getClass()); } } + +private static Path assembleCompactedFilePath(Path uncompactedPath) { +String uncompactedName = uncompactedPath.getName(); +if (uncompactedName.startsWith(".")) { Review Comment: I did not touch this method, I just moved the `getWriterType` above this to keep the method visibility order in the file intact, because that method became package-private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 [flink]
pan3793 commented on PR #24905: URL: https://github.com/apache/flink/pull/24905#issuecomment-2160590805 ping @JingGe, is there any thing I can do before merging? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]
qg-lin opened a new pull request, #3409: URL: https://github.com/apache/flink-cdc/pull/3409 https://issues.apache.org/jira/browse/FLINK-35540 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
hlteoh37 commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.beans.BeanInfo; +import java.beans.IntrospectionException; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter +implements ElementConverter { + +private final CompositeType typeInfo; +private final boolean ignoreNulls; +private final TableSchema tableSchema; + +/** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) { +this(typeInfo, true); +} + +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, boolean ignoreNulls) { + +try { +this.typeInfo = typeInfo; +this.ignoreNulls = ignoreNulls; +this.tableSchema = createTableSchema(typeInfo); +} catch (IntrospectionException | IllegalStateException | IllegalArgumentException e) { +throw new FlinkRuntimeException("Failed to extract DynamoDb table schema", e); +} +} + +@Override +public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) { +Preconditions.checkNotNull(tableSchema, "TableSchema is not initialized"); +try { +return DynamoDbWriteRequest.builder() +.setType(DynamoDbWriteRequestType.PUT) +.setItem(tableSchema.itemToMap(input, ignoreNulls)) +.build(); +} catch (ClassCastExcept
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
hlteoh37 commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.beans.BeanInfo; +import java.beans.IntrospectionException; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter +implements ElementConverter { + +private final CompositeType typeInfo; +private final boolean ignoreNulls; +private final TableSchema tableSchema; + +/** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) { +this(typeInfo, true); +} + +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, boolean ignoreNulls) { + +try { +this.typeInfo = typeInfo; +this.ignoreNulls = ignoreNulls; +this.tableSchema = createTableSchema(typeInfo); +} catch (IntrospectionException | IllegalStateException | IllegalArgumentException e) { +throw new FlinkRuntimeException("Failed to extract DynamoDb table schema", e); +} +} + +@Override +public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) { +Preconditions.checkNotNull(tableSchema, "TableSchema is not initialized"); +try { +return DynamoDbWriteRequest.builder() +.setType(DynamoDbWriteRequestType.PUT) +.setItem(tableSchema.itemToMap(input, ignoreNulls)) +.build(); +} catch (ClassCastExcept
Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]
hlteoh37 commented on code in PR #136: URL: https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java: ## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.NumericTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.types.Row; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.beans.BeanInfo; +import java.beans.IntrospectionException; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * A {@link ElementConverter} that converts an element to a {@link DynamoDbWriteRequest} using + * TypeInformation provided. + */ +@PublicEvolving +public class DynamoDbTypeInformedElementConverter +implements ElementConverter { + +private final CompositeType typeInfo; +private final boolean ignoreNulls; +private final TableSchema tableSchema; + +/** + * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an element to a {@link + * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: {@code new + * DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))} + * + * @param typeInfo The {@link CompositeType} that provides the type information for the element. + */ +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) { +this(typeInfo, true); +} + +public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, boolean ignoreNulls) { + +try { +this.typeInfo = typeInfo; +this.ignoreNulls = ignoreNulls; +this.tableSchema = createTableSchema(typeInfo); +} catch (IntrospectionException | IllegalStateException | IllegalArgumentException e) { +throw new FlinkRuntimeException("Failed to extract DynamoDb table schema", e); +} +} + +@Override +public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) { +Preconditions.checkNotNull(tableSchema, "TableSchema is not initialized"); +try { +return DynamoDbWriteRequest.builder() +.setType(DynamoDbWriteRequestType.PUT) +.setItem(tableSchema.itemToMap(input, ignoreNulls)) +.build(); +} catch (ClassCastExcept
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
gaborgsomogyi commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634845033 ## flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java: ## @@ -59,13 +71,38 @@ public TrustManager[] trustManagers() { .fingerprints(sslCertFingerprints) .build(); -trustManagerFactory.init(loadKeystore(sslTrustStore, sslTrustStorePassword)); +trustManagerFactory.init( +loadKeystore(sslTrustStore, sslTrustStorePassword, sslTrustStoreType)); return trustManagerFactory.getTrustManagers(); -} catch (GeneralSecurityException e) { +} catch (GeneralSecurityException | IOException e) { // replicate exception handling from SSLEngineProvider throw new RemoteTransportException( "Server SSL connection could not be established because SSL context could not be constructed", e); } } + +@Override +public KeyStore loadKeystore(String filename, String password) { +try { +return loadKeystore(filename, password, sslKeyStoreType); +} catch (IOException +| CertificateException +| NoSuchAlgorithmException +| KeyStoreException e) { +throw new RemoteTransportException( Review Comment: I've checked out the code and played with it. I agree that's what we can do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
gaborgsomogyi commented on code in PR #24919: URL: https://github.com/apache/flink/pull/24919#discussion_r1634861389 ## flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java: ## @@ -285,7 +291,14 @@ private static KeyManagerFactory getKeyManagerFactory( : SecurityOptions.SSL_REST_KEY_PASSWORD, SecurityOptions.SSL_KEY_PASSWORD); -KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); +// do not use getAndCheckOption here as there is no fallback option and a default is +// specified +String keystoreType = +internal +? config.get(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE) +: config.get(SecurityOptions.SSL_REST_KEYSTORE_TYPE); Review Comment: Just a clarification for other reviewers, since there is default value it just doesn't make sense to provide fallback. ## flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java: ## @@ -59,13 +68,35 @@ public TrustManager[] trustManagers() { .fingerprints(sslCertFingerprints) .build(); -trustManagerFactory.init(loadKeystore(sslTrustStore, sslTrustStorePassword)); +trustManagerFactory.init( +loadKeystore(sslTrustStore, sslTrustStorePassword, sslTrustStoreType)); return trustManagerFactory.getTrustManagers(); -} catch (GeneralSecurityException e) { +} catch (GeneralSecurityException | IOException e) { // replicate exception handling from SSLEngineProvider throw new RemoteTransportException( "Server SSL connection could not be established because SSL context could not be constructed", e); } } + +@Override +public KeyStore loadKeystore(String filename, String password) { +try { +return loadKeystore(filename, password, sslKeyStoreType); +} catch (IOException | GeneralSecurityException e) { +throw new RemoteTransportException( +"Server SSL connection could not be established because SSL context could not be constructed", Review Comment: Here we can be more specific: "Server SSL connection could not be established because keystore could not be loaded" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]
LadyForest closed pull request #24889: [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 URL: https://github.com/apache/flink/pull/24889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding Opensearch Connector v2.0.0 [flink-web]
snuyanzin merged PR #741: URL: https://github.com/apache/flink-web/pull/741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding Opensearch Connector v1.2.0 [flink-web]
snuyanzin merged PR #740: URL: https://github.com/apache/flink-web/pull/740 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Enable ci for v1.2 [flink-connector-opensearch]
snuyanzin merged PR #47: URL: https://github.com/apache/flink-connector-opensearch/pull/47 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]
snuyanzin opened a new pull request, #24921: URL: https://github.com/apache/flink/pull/24921 ## What is the purpose of the change Add docs for new Opensearch connector release In fact docs are same for both v1 and v2, so just referencing to one branch (also references to multiple branches are not supported here IIRC) ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no ) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (docs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]
flinkbot commented on PR #24921: URL: https://github.com/apache/flink/pull/24921#issuecomment-2161796100 ## CI report: * 84a66fe40a34e3e74b68beee473005b903a69fe3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]
1996fanrui commented on PR #24881: URL: https://github.com/apache/flink/pull/24881#issuecomment-2161888308 > Thanks @GOODBOY008 for the quick update, LGTM now. Hi @1996fanrui, could you help double check this PR? Thank @Jiabao-Sun for the ping, I will check it these 2 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]
leonardBang merged PR #3409: URL: https://github.com/apache/flink-cdc/pull/3409 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]
1996fanrui commented on PR #24895: URL: https://github.com/apache/flink/pull/24895#issuecomment-2161973417 It seems one test related to timer fails, it maybe caused by this PR. `StreamTaskCancellationTest.testCancelTaskShouldPreventAdditionalEventTimeTimersFromBeingFired:272->testCancelTaskShouldPreventAdditionalTimersFromBeingFired:300 ยป IllegalState` https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60198&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=10477 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35234][hotfix][cdc-common] Fix NullPointerException of org.apache.flink.cdc.common.configuration.ConfigurationUtils#convertToString [flink-cdc]
Jiabao-Sun commented on code in PR #3255: URL: https://github.com/apache/flink-cdc/pull/3255#discussion_r1635733171 ## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java: ## @@ -130,7 +130,9 @@ static Duration convertToDuration(Object o) { } static String convertToString(Object o) { -if (o.getClass() == String.class) { +if (o == null) { Review Comment: The inner null check seems redundant now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34545][cdc-pipeline-connector]Add OceanBase pipeline connector to Flink CDC [flink-cdc]
yuxiqian commented on PR #3360: URL: https://github.com/apache/flink-cdc/pull/3360#issuecomment-2161996630 Hi @yuanoOo, thanks for your great contribution! Since @whhe is familiar with OceanBase, do you have time to take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]
fredia opened a new pull request, #24922: URL: https://github.com/apache/flink/pull/24922 ## What is the purpose of the change Add file size and file count metrics for file-merging. ## Brief change log - Add `logicalFileCount`, `logicalFileSize`, `physicalFileCount`, `physicalFileSize` metrics. ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). This change added tests and can be verified as follows: - `FileMergingMetricsTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable async profiler [flink-benchmarks]
Zakelly merged PR #90: URL: https://github.com/apache/flink-benchmarks/pull/90 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]
flinkbot commented on PR #24922: URL: https://github.com/apache/flink/pull/24922#issuecomment-2162053937 ## CI report: * 3405d0cc2c400c9c2d3a596b600b2ebda07af2f9 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MailboxProcessor#processMailsWhenDefaultActionUnavailable avoids allocating Optional [flink]
schlosna opened a new pull request, #24923: URL: https://github.com/apache/flink/pull/24923 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]
Zakelly opened a new pull request, #24924: URL: https://github.com/apache/flink/pull/24924 ## What is the purpose of the change In checkpoint file merging, we should take `PlaceholderStreamStateHandle` into account during lifecycle, since it can be a file merged one. ## Brief change log - take `PlaceholderStreamStateHandle` into account in `reusePreviousStateHandle` and `couldReuseStateHandle` ## Verifying this change This change is already covered by existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Revert FLINK-34123&35068: Add built-in dedicated serialization support for Maps, Lists, Sets, and Collections [flink]
X-czh opened a new pull request, #24925: URL: https://github.com/apache/flink/pull/24925 ## What is the purpose of the change Reverting previous changes that break state compatibility. [FLINK-34123](https://issues.apache.org/jira/browse/FLINK-34123) & [FLINK-35068](https://issues.apache.org/jira/browse/FLINK-35068) introduced built-in dedicated serialization support for Maps, Sets, Lists, and Collections. However, if a user defines a collection-typed value in state, the default serializer is changed from Kryo to dedicated serializer, breaking compatibility. Furthermore, since RocksDBStateBackend does not support serialization format change for the keys in MapState, we cannot implement a fully-compatible solution for this case. ## Brief change log Revert the following three commits: 1. [FLINK-34123][core][type] Introduce built-in serialization support for Map, List, and Collection 2. [FLINK-34123][docs][type] Add doc for built-in serialization support for Map, List, and Collection 3. [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MailboxProcessor#processMailsWhenDefaultActionUnavailable avoids allocating Optional [flink]
flinkbot commented on PR #24923: URL: https://github.com/apache/flink/pull/24923#issuecomment-2162103603 ## CI report: * 88f9a4cf7a8e79d861755262f84f91a4b1b0d026 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35570] Consider PlaceholderStreamStateHandle in checkpoint file merging [flink]
flinkbot commented on PR #24924: URL: https://github.com/apache/flink/pull/24924#issuecomment-2162103785 ## CI report: * 9ce0a6f2bb299224ed170ce0944de94703253b81 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org