Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]

2024-06-11 Thread via GitHub


pnowojski commented on code in PR #24895:
URL: https://github.com/apache/flink/pull/24895#discussion_r1634312543


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##
@@ -307,18 +307,37 @@ void onProcessingTime(long time) throws Exception {
 }
 
 public void advanceWatermark(long time) throws Exception {
-currentWatermark = time;
+Preconditions.checkState(
+tryAdvanceWatermark(
+time,
+() -> {
+// Never stop advancing.
+return false;
+}));
+}
 
+/**
+ * @return true if following watermarks can be processed immediately. 
False if the firing timers
+ * should be interrupted as soon as possible.
+ */
+public boolean tryAdvanceWatermark(
+long time, InternalTimeServiceManager.ShouldStopAdvancingFn 
shouldStopAdvancingFn)
+throws Exception {
+currentWatermark = time;
 InternalTimer timer;
-
+boolean stop = cancellationContext.isCancelled();
 while ((timer = eventTimeTimersQueue.peek()) != null
 && timer.getTimestamp() <= time
-&& !cancellationContext.isCancelled()) {
+&& !stop) {
 keyContext.setCurrentKey(timer.getKey());
 eventTimeTimersQueue.poll();
 triggerTarget.onEventTime(timer);
 taskIOMetricGroup.getNumFiredTimers().inc();
+// Check if we should stop advancing after at least one iteration 
to guarantee progress
+// and prevent a potential starvation.
+stop = cancellationContext.isCancelled() || 
shouldStopAdvancingFn.test();
 }
+return true;

Review Comment:
   That explains failing test :) Thanks for spotting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-33858) CI fails with No space left on device

2024-06-11 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-33858.
-
Resolution: Fixed

> CI fails with No space left on device
> -
>
> Key: FLINK-33858
> URL: https://issues.apache.org/jira/browse/FLINK-33858
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Reporter: Sergey Nuyanzin
>Priority: Blocker
>
> AlibabaCI003-agent01
> AlibabaCI003-agent03
> AlibabaCI003-agent05
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=9765]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][cdc-connector][mysql] skip SchemaChangeEvents that were not included in capturedTableFilter. [flink-cdc]

2024-06-11 Thread via GitHub


steveGuRen commented on PR #2986:
URL: https://github.com/apache/flink-cdc/pull/2986#issuecomment-2159992536

   @joyCurry30 @leonardBang  Logically, it is considered a bug when I have to 
read all tables in the schema, and the first execution of "alter table sql" is 
not triggered. However, since TableDiscoveryUtils loads all tables, it works. 
But in a SaaS environment, with so many tables(five hundred thousand, a tenant 
has their own tables), this will cause issues. Because we can't load all tables 
when have to discover the new Table with query sql by "show create sql " for 
every table.
   So I fix the TableDiscoveryUtils so that just load a table in start or ddl 
change, and shouEmit logic updates too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28867) Parquet reader support nested type in array/map type

2024-06-11 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853942#comment-17853942
 ] 

Jingsong Lee commented on FLINK-28867:
--

Looks not easy to support... Call for contributors...

> Parquet reader support nested type in array/map type
> 
>
> Key: FLINK-28867
> URL: https://issues.apache.org/jira/browse/FLINK-28867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: dalongliu
>Priority: Major
> Attachments: ReadParquetArray1.java, part-00121.parquet
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][cdc-connector][mysql] skip SchemaChangeEvents that were not included in capturedTableFilter. [flink-cdc]

2024-06-11 Thread via GitHub


steveGuRen commented on PR #2986:
URL: https://github.com/apache/flink-cdc/pull/2986#issuecomment-216384

   In the design of TiDB, I find that they take into account the scenario of 
having a large number of tables in a SaaS environment. Therefore, special 
attention is given to the performance of DDL operations in such contexts. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


gaborgsomogyi commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634351465


##
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java:
##
@@ -50,7 +50,7 @@ public abstract class OutputStreamBasedPartFileWriter
 
 @Nullable final Path targetPath;
 
-private CompactingFileWriter.Type writeType = null;
+private Type writeType = null;

Review Comment:
   Reading the code and seeing `Type` purely requires some digging. Changing 
the name to `WriterType` would be unnecessary duplication so I would vote on 
have prefixed from where it comes from.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add Flink 1.19.1 release [flink-web]

2024-06-11 Thread via GitHub


leonardBang commented on code in PR #745:
URL: https://github.com/apache/flink-web/pull/745#discussion_r1634358337


##
docs/content/posts/2024-06-14-release-1.19.1.md:
##
@@ -0,0 +1,128 @@
+---
+title:  "Apache Flink 1.19.1 Release Announcement"
+date: "2024-06-14T00:00:00.000Z"
+aliases:
+- /news/2024/06/14/release-1.19.1.html

Review Comment:
   tips: please update the date when fire the release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35564) The topic cannot be distributed on subtask when calculatePartitionOwner returns -1

2024-06-11 Thread Jira
中国无锡周良 created FLINK-35564:
--

 Summary: The topic cannot be distributed on subtask when 
calculatePartitionOwner returns -1
 Key: FLINK-35564
 URL: https://issues.apache.org/jira/browse/FLINK-35564
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.2
Reporter: 中国无锡周良


The topic cannot be distributed on subtask when calculatePartitionOwner returns 
-1
{code:java}
@VisibleForTesting
static int calculatePartitionOwner(String topic, int partitionId, int 
parallelism) {
int startIndex = ((topic.hashCode() * 31) & 0x7FFF) % parallelism;
/*
 * Here, the assumption is that the id of Pulsar partitions are always 
ascending starting from
 * 0. Therefore, can be used directly as the offset clockwise from the 
start index.
 */
return (startIndex + partitionId) % parallelism;
} {code}
Here startIndex is a non-negative number calculated based on topic.hashCode() 
and in the range [0, parallelism-1].

For non-partitioned topic. partitionId is NON_PARTITION_ID = -1;

but
{code:java}
@Override
public Optional> createAssignment(
List readers) {
if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
return Optional.empty();
}

Map> assignMap =
new HashMap<>(pendingPartitionSplits.size());

for (Integer reader : readers) {
Set splits = 
pendingPartitionSplits.remove(reader);
if (splits != null && !splits.isEmpty()) {
assignMap.put(reader, new ArrayList<>(splits));
}
}

if (assignMap.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new SplitsAssignment<>(assignMap));
}
} {code}
pendingPartitionSplits can't possibly have a value of -1, right? The 
calculation method of the topic by the above return 1, pendingPartitionSplits. 
Remove (reader), forever is null; This topic will not be assigned to a subtask; 
And I simulated this topic locally and found that messages were indeed not 
processed;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35559][Connector/Kinesis] fix shading configuration to avoid class conflicts when using flink-connector-kinesis alongside Firehose sink [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 merged PR #142:
URL: https://github.com/apache/flink-connector-aws/pull/142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35068][core][type] Introduce built-in serialization support for java.util.Set [flink]

2024-06-11 Thread via GitHub


reswqa merged PR #24845:
URL: https://github.com/apache/flink/pull/24845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35068) Introduce built-in serialization support for Set

2024-06-11 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo closed FLINK-35068.
--
Resolution: Done

master(1.20) via e1b45684394541ee290a3d81cc59a85623396c42

> Introduce built-in serialization support for Set
> 
>
> Key: FLINK-35068
> URL: https://issues.apache.org/jira/browse/FLINK-35068
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Introduce built-in serialization support for {{{}Set{}}}, another common Java 
> collection type. We'll need to add a new built-in serializer for it 
> ({{{}MultiSetTypeInformation{}}} utilizes {{MapSerializer}} underneath, but 
> it could be more efficient for common {{{}Set{}}}).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35458) Flink 2.0: Add serializer upgrade test for set serializer

2024-06-11 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35458:
---
Summary: Flink 2.0: Add serializer upgrade test for set serializer  (was: 
Add serializer upgrade test for set serializer)

> Flink 2.0: Add serializer upgrade test for set serializer
> -
>
> Key: FLINK-35458
> URL: https://issues.apache.org/jira/browse/FLINK-35458
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Zhanghao Chen
>Priority: Major
> Fix For: 2.0.0
>
>
> New dedicated serializer for Sets is introduced in 
> [FLINK-35068|https://issues.apache.org/jira/browse/FLINK-35068]. Since 
> serializer upgrade test requires at least one previous release to test the 
> upgrade of set serializers (which does not exist yet), we'll add the upgrade 
> test for set serializer after the release of v1.20.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30400) Stop bundling connector-base in externalized connectors

2024-06-11 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853965#comment-17853965
 ] 

Danny Cranmer commented on FLINK-30400:
---

I agree this is frustrating. It is a similar problem with 
{{flink-streaming-java}} and {{flink-clients}} that I am sure many users 
stumble on. Personally I create an "intellij" Maven profile that I enable in 
the IDE which includes these dependencies. It would be ideal if we could 
provide a Flink dev plugin or similar that sets these things up.

> Stop bundling connector-base in externalized connectors
> ---
>
> Key: FLINK-30400
> URL: https://issues.apache.org/jira/browse/FLINK-30400
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Common
>Reporter: Chesnay Schepler
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-3.1.0, aws-connector-4.2.0, kafka-3.1.0, 
> rabbitmq-3.1.0, kafka-3.0.2
>
>
> Check that none of the externalized connectors bundle connector-base; if so 
> remove the bundling and schedule a new minor release.
> Bundling this module is highly problematic w.r.t. binary compatibility, since 
> bundled classes may rely on internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31238) Use IngestDB to speed up Rocksdb rescaling recovery

2024-06-11 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853966#comment-17853966
 ] 

Robert Metzger commented on FLINK-31238:


[~mayuehappy] what's the status of creating a FrocksDB release?

> Use IngestDB to speed up Rocksdb rescaling recovery 
> 
>
> Key: FLINK-31238
> URL: https://issues.apache.org/jira/browse/FLINK-31238
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2023-02-27-16-41-18-552.png, 
> image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, 
> image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, 
> image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, 
> image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png, 
> image-2023-03-09-15-46-01-176.png, image-2023-03-09-15-50-04-281.png, 
> image-2023-03-29-15-25-21-868.png, image-2023-07-17-14-37-38-864.png, 
> image-2023-07-17-14-38-56-946.png, image-2023-07-22-14-16-31-856.png, 
> image-2023-07-22-14-19-01-390.png, image-2023-08-08-21-32-43-783.png, 
> image-2023-08-08-21-34-39-008.png, image-2023-08-08-21-39-39-135.png, 
> screenshot-1.png
>
>
> (The detailed design is in this document
> [https://docs.google.com/document/d/10MNVytTsyiDLZQSR89kDkVdmK_YjbM6jh0teerfDFfI|https://docs.google.com/document/d/10MNVytTsyiDLZQSR89kDkVdmK_YjbM6jh0teerfDFfI])
> There have been many discussions and optimizations in the community about 
> optimizing rocksdb scaling and recovery.
> https://issues.apache.org/jira/browse/FLINK-17971
> https://issues.apache.org/jira/browse/FLINK-8845
> https://issues.apache.org/jira/browse/FLINK-21321
> We hope to discuss some of our explorations under this ticket
> The process of scaling and recovering in rocksdb simply requires two steps
>  # Insert the valid keyGroup data of the new task.
>  # Delete the invalid data in the old stateHandle.
> The current method for data writing is to specify the main Db first and then 
> insert data using writeBatch.In addition, the method of deleteRange is 
> currently used to speed up the ClipDB. But in our production environment, we 
> found that the speed of rescaling is still very slow, especially when the 
> state of a single Task is large. 
>  
> We hope that the previous sst file can be reused directly when restoring 
> state, instead of retraversing the data. So we made some attempts to optimize 
> it in our internal version of flink and frocksdb.
>  
> We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
>  * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
> db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
> beyond the key range. We will iterate over the FileMetaData of db. Process 
> each sst file. There are three situations here. 
> If all the keys of a file are required, we will keep the sst file and do 
> nothing 
> If all the keys of the sst file exceed the specified range, we will delete 
> the file directly. 
> If we only need some part of the sst file, we will rewrite the required keys 
> to generate a new sst file。
> All sst file changes will be placed in a VersionEdit, and the current 
> versions will LogAndApply this edit to ensure that these changes can take 
> effect
>  * IngestDb is used to directly ingest all sst files of one DB into another 
> DB. But it is necessary to strictly ensure that the keys of the two DBs do 
> not overlap, which is easy to do in the Flink scenario. The hard link method 
> will be used in the process of ingesting files, so it will be very fast. At 
> the same time, the file number of the main DB will be incremented 
> sequentially, and the SequenceNumber of the main DB will be updated to the 
> larger SequenceNumber of the two DBs.
> When IngestDb and ClipDb are supported, the state restoration logic is as 
> follows
>  * Open the first StateHandle as the main DB and pause the compaction.
>  * Clip the main DB according to the KeyGroup range of the Task with ClipDB
>  * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  
> according to the KeyGroup range
>  * Ingest all tmpDb into the main Db after tmpDb cliped
>  * Open the Compaction process of the main DB
> !screenshot-1.png|width=923,height=243!
> We have done some benchmark tests on the internal Flink version, and the test 
> results show that compared with the writeBatch method, the expansion and 
> recovery speed of IngestDb can be increased by 5 to 10 times as follows 
> (SstFileWriter means uses the recovery method of generating sst files through 
> SstFileWriter in

[jira] [Updated] (FLINK-35559) Shading issue cause class conflict

2024-06-11 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-35559:

Fix Version/s: aws-connector-3.1.0

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-3.1.0, aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(A

[jira] [Commented] (FLINK-35559) Shading issue cause class conflict

2024-06-11 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853972#comment-17853972
 ] 

Hong Liang Teoh commented on FLINK-35559:
-

merged commit 
[{{20e4ee6}}|https://github.com/apache/flink-connector-aws/commit/20e4ee627e9570042a8b24c4d7bc7f115299ee1a]
 into   apache:main

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-3.1.0, aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(

[jira] [Commented] (FLINK-35559) Shading issue cause class conflict

2024-06-11 Thread Hong Liang Teoh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853971#comment-17853971
 ] 

Hong Liang Teoh commented on FLINK-35559:
-

[~a.pilipenko] can we backport to v3.1 branch?

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-3.1.0, aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.p

[jira] [Updated] (FLINK-34123) Introduce built-in serialization support for Map and List

2024-06-11 Thread Zhanghao Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-34123:
--
Release Note: Dedicated serialization support for four common Java 
collection types: Map, List, Set, and Collection is introduced with better 
serialization performance than falling back to the generic Kryo serializer.

> Introduce built-in serialization support for Map and List
> -
>
> Key: FLINK-34123
> URL: https://issues.apache.org/jira/browse/FLINK-34123
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Introduce built-in serialization support for Map and List, two common 
> collection types for which Flink already have custom serializers implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35068) Introduce built-in serialization support for Set

2024-06-11 Thread Zhanghao Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-35068:
--
Release Note: Dedicated serialization support for four common Java 
collection types: Map, List, Set, and Collection is introduced with better 
serialization performance than falling back to the generic Kryo serializer.

> Introduce built-in serialization support for Set
> 
>
> Key: FLINK-35068
> URL: https://issues.apache.org/jira/browse/FLINK-35068
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Type Serialization System
>Affects Versions: 1.20.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Introduce built-in serialization support for {{{}Set{}}}, another common Java 
> collection type. We'll need to add a new built-in serializer for it 
> ({{{}MultiSetTypeInformation{}}} utilizes {{MapSerializer}} underneath, but 
> it could be more efficient for common {{{}Set{}}}).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35559) Shading issue cause class conflict

2024-06-11 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853979#comment-17853979
 ] 

Aleksandr Pilipenko commented on FLINK-35559:
-

{quote}can we backport to v3.1 branch?
{quote}
No need since v3.x branch does not have kinesis connector, which has this issue

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-3.1.0, aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Completa

[jira] [Updated] (FLINK-35559) Shading issue cause class conflict

2024-06-11 Thread Aleksandr Pilipenko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksandr Pilipenko updated FLINK-35559:

Fix Version/s: (was: aws-connector-3.1.0)

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(Asy

Re: [PR] [FLINK-35282][FLINK-35520] PyFlink Support for Apache Beam > 2.49 [flink]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #24908:
URL: https://github.com/apache/flink/pull/24908#discussion_r1634474656


##
docs/layouts/shortcodes/generated/python_configuration.html:
##
@@ -24,7 +24,7 @@
 python.executable
 "python"
 String
-Specify the path of the python interpreter used to execute the 
python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam 
(version == 2.43.0), Pip (version >= 20.3) and SetupTools (version >= 
37.0.0). Please ensure that the specified environment meets the above 
requirements. The option is equivalent to the command line option 
"-pyexec".
+   Specify the path of the python interpreter used to execute the 
python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam 
(version >= 2.52.0), Pip (version >= 20.3) and SetupTools (version >= 
37.0.0). Please ensure that the specified environment meets the above 
requirements. The option is equivalent to the command line option 
"-pyexec".

Review Comment:
   Verified that Beam >= 2.48 still supports Python 3.8+



##
pom.xml:
##
@@ -160,7 +160,7 @@ under the License.
1.3
3.23.1
0.10.9.7
-   2.43.0

Review Comment:
   Could we update this to 2.54?
   
   This mitigates additional CVEs from netty that we pull in via Beam
   
   https://github.com/apache/beam/issues/29861



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32081][checkpoint] Compatibility between file-merging on and off across job runs [flink]

2024-06-11 Thread via GitHub


Zakelly closed pull request #24873: [FLINK-32081][checkpoint] Compatibility 
between file-merging on and off across job runs
URL: https://github.com/apache/flink/pull/24873


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32081) Compatibility between file-merging on and off across job runs

2024-06-11 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853989#comment-17853989
 ] 

Zakelly Lan commented on FLINK-32081:
-

master: f33808c0f02c12488d4de2f4d517da8b8b100224

> Compatibility between file-merging on and off across job runs
> -
>
> Key: FLINK-32081
> URL: https://issues.apache.org/jira/browse/FLINK-32081
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-32081) Compatibility between file-merging on and off across job runs

2024-06-11 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan resolved FLINK-32081.
-
Resolution: Fixed

> Compatibility between file-merging on and off across job runs
> -
>
> Key: FLINK-32081
> URL: https://issues.apache.org/jira/browse/FLINK-32081
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2024-06-11 Thread Ran Tao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853990#comment-17853990
 ] 

Ran Tao commented on FLINK-12450:
-

[~kartikeypant] thanks for pushing this issue, feel free to assign it to you if 
it's still valid. :)

> [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table 
> API and SQL
> ---
>
> Key: FLINK-12450
> URL: https://issues.apache.org/jira/browse/FLINK-12450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Assignee: Ran Tao
>Priority: Major
>  Labels: auto-unassigned, stale-assigned
>
> BIT_LSHIFT, Shifts a long number to the left
> BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32091) Add necessary metrics for file-merging

2024-06-11 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-32091:
---

Assignee: Yanfei Lei  (was: Hangxiang Yu)

> Add necessary metrics for file-merging
> --
>
> Key: FLINK-32091
> URL: https://issues.apache.org/jira/browse/FLINK-32091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset

2024-06-11 Thread Naci Simsek (Jira)
Naci Simsek created FLINK-35565:
---

 Summary: Flink KafkaSource Batch Job Gets Into Infinite Loop after 
Resetting Offset
 Key: FLINK-35565
 URL: https://issues.apache.org/jira/browse/FLINK-35565
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: kafka-3.1.0
 Environment: This is reproduced on a *Flink 1.18.1* with the latest 
Kafka connector 3.1.0-1.18 on a session cluster.
Reporter: Naci Simsek
 Attachments: image-2024-06-11-11-19-09-889.png, 
taskmanager_localhost_54489-ac092a_log.txt

h2. Summary

Flink batch job gets into an infinite fetch loop and could not gracefully 
finish if the connected Kafka topic is empty and starting offset value in Flink 
job is lower than the current start/end offset of the related topic. See below 
for details:
h2. How to reproduce

Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
from Kafka topic.

Related Kafka topic is empty, there are no events, and the offset value is as 
below: *15*

!image-2024-06-11-11-19-09-889.png|width=895,height=256!

 

Flink job uses a *specific starting offset* value, which is +*less*+ than the 
current offset of the topic/partition.

See below, it set as “4”

{{}}
{code:java}
package naci.grpId;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;

public class KafkaSource_Print {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// Define the specific offsets for the partitions
Map specificOffsets = new HashMap<>();
specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start 
from offset 4 for partition 0

KafkaSource kafkaSource = KafkaSource
.builder()
.setBootstrapServers("localhost:9093")  // Make sure the port 
is correct
.setTopics("topic_test")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
.setBounded(OffsetsInitializer.latest())
.build();

DataStream stream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
stream.print();

env.execute("Flink KafkaSource test job");
}
}{code}
{{}}

 

Here are the initial logs printed related to the offset, as soon as the job 
gets submitted:

{{}}
{code:java}
2024-05-30 12:15:50,010 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]
2024-05-30 12:15:50,069 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]]
2024-05-30 12:15:50,074 TRACE 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
Seeking starting offsets to specified offsets: {topic_test-0=4}
2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer   
   [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
groupId=null] Seeking to offset 4 for partition topic_test-0
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
SplitsChange handling result: [topic_test-0, start:4, stop: 15]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task AddSplitsTask: [[[Partition: topic_test-0, 
StartingOffset: 4, StoppingOffset: 15]]]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask{code}
{{}}

 

Since the starting offset {color:#FF}*4*{color} is *out of range* for the 
Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task 
manager logs:

{{}}
{code:java}
2024-05-30 12:15:50,193 INFO  
org.apache.kafka.clients.consumer.internals.Fetcher  [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position 
FetchPosition{offset=4, offsetEpoch=Optional.empty, 
currentLeader=Le

[jira] [Updated] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset

2024-06-11 Thread Naci Simsek (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Naci Simsek updated FLINK-35565:

Description: 
h2. Summary

Flink batch job gets into an infinite fetch loop and could not gracefully 
finish if the connected Kafka topic is empty and starting offset value in Flink 
job is lower than the current start/end offset of the related topic. See below 
for details:
h2. How to reproduce

Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
from Kafka topic.

Related Kafka topic is empty, there are no events, and the offset value is as 
below: *15*

!image-2024-06-11-11-19-09-889.png|width=895,height=256!

 

Flink job uses a *specific starting offset* value, which is +*less*+ than the 
current offset of the topic/partition.

See below, it set as “4”

 
{code:java}
package naci.grpId;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;

public class KafkaSource_Print {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// Define the specific offsets for the partitions
Map specificOffsets = new HashMap<>();
specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // Start 
from offset 4 for partition 0

KafkaSource kafkaSource = KafkaSource
.builder()
.setBootstrapServers("localhost:9093")  // Make sure the port 
is correct
.setTopics("topic_test")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
.setBounded(OffsetsInitializer.latest())
.build();

DataStream stream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
stream.print();

env.execute("Flink KafkaSource test job");
}
}{code}
 

 

Here are the initial logs printed related to the offset, as soon as the job 
gets submitted:

 
{code:java}
2024-05-30 12:15:50,010 INFO  
org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]
2024-05-30 12:15:50,069 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
StoppingOffset: 15]]]
2024-05-30 12:15:50,074 TRACE 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
Seeking starting offsets to specified offsets: {topic_test-0=4}
2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer   
   [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
groupId=null] Seeking to offset 4 for partition topic_test-0
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
SplitsChange handling result: [topic_test-0, start:4, stop: 15]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
Finished running task AddSplitsTask: [[[Partition: topic_test-0, 
StartingOffset: 4, StoppingOffset: 15]]]
2024-05-30 12:15:50,075 DEBUG 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Prepare 
to run FetchTask{code}
 

Since the starting offset {color:#ff}*4*{color} is *out of range* for the 
Kafka topic, KafkaConsumer initiates an {*}offset +reset+{*}, as seen on task 
manager logs:

 
{code:java}
2024-05-30 12:15:50,193 INFO  
org.apache.kafka.clients.consumer.internals.Fetcher  [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Fetch position 
FetchPosition{offset=4, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[nacis-mbp-m2:9093 (id: 1 rack: 
null)], epoch=0}} is out of range for partition topic_test-0, resetting offset
2024-05-30 12:15:50,195 INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=KafkaSource--2381765882724812354-0, groupId=null] Resetting offset for 
partition topic_test-0 to position FetchPosition{offset=15, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[nacis

[jira] [Created] (FLINK-35566) Consider promoting TypeSerializer from PublicEvolving to Public

2024-06-11 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-35566:
--

 Summary: Consider promoting TypeSerializer from PublicEvolving to 
Public
 Key: FLINK-35566
 URL: https://issues.apache.org/jira/browse/FLINK-35566
 Project: Flink
  Issue Type: Technical Debt
  Components: API / Core
Reporter: Martijn Visser


While working on implementing FLINK-35378, I ran into the problem that 
TypeSerializer is still on PublicEvolving since Flink 1.0. We should consider 
annotating this as Public. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error

2024-06-11 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35567:
--
Affects Version/s: cdc-3.1.1
   (was: cdc-3.2.0)

> CDC BinaryWriter cast NullableSerializerWrapper error 
> --
>
> Key: FLINK-35567
> URL: https://issues.apache.org/jira/browse/FLINK-35567
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.1.1
>
>
> Current, we will generate data type serializers by 
> org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]),
>  which will put into a 
> NullableSerializerWrapper.
> {code:java}
> //代码占位符
> public BinaryRecordDataGenerator(DataType[] dataTypes) {
> this(
> dataTypes,
> Arrays.stream(dataTypes)
> .map(InternalSerializers::create)
> .map(NullableSerializerWrapper::new)
> .toArray(TypeSerializer[]::new));
> } {code}
> However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast 
> NullableSerializerWrapper to 
> ArrayDataSerializer/TypeSerializer/TypeSerializer.
> A exception will be thrown:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be 
> cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer
>   at 
> org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134)
> at 
> org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error

2024-06-11 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35567:
-

 Summary: CDC BinaryWriter cast NullableSerializerWrapper error 
 Key: FLINK-35567
 URL: https://issues.apache.org/jira/browse/FLINK-35567
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Hongshun Wang
 Fix For: cdc-3.1.1


Current, we will generate data type serializers by 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]),
 which will put into a 
NullableSerializerWrapper.
{code:java}
//代码占位符
public BinaryRecordDataGenerator(DataType[] dataTypes) {
this(
dataTypes,
Arrays.stream(dataTypes)
.map(InternalSerializers::create)
.map(NullableSerializerWrapper::new)
.toArray(TypeSerializer[]::new));
} {code}
However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast 
NullableSerializerWrapper to 
ArrayDataSerializer/TypeSerializer/TypeSerializer.
A exception will be thrown:
{code:java}
java.lang.ClassCastException: 
org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be 
cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer
at 
org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134)
at 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35567) CDC BinaryWriter cast NullableSerializerWrapper error

2024-06-11 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated FLINK-35567:
--
Fix Version/s: cdc-3.2.0
   (was: cdc-3.1.1)

> CDC BinaryWriter cast NullableSerializerWrapper error 
> --
>
> Key: FLINK-35567
> URL: https://issues.apache.org/jira/browse/FLINK-35567
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> Current, we will generate data type serializers by 
> org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator#BinaryRecordDataGenerator(org.apache.flink.cdc.common.types.DataType[]),
>  which will put into a 
> NullableSerializerWrapper.
> {code:java}
> //代码占位符
> public BinaryRecordDataGenerator(DataType[] dataTypes) {
> this(
> dataTypes,
> Arrays.stream(dataTypes)
> .map(InternalSerializers::create)
> .map(NullableSerializerWrapper::new)
> .toArray(TypeSerializer[]::new));
> } {code}
> However, when use in BinaryWriter#write, if type is ARRAY/MAP/ROW, will cast 
> NullableSerializerWrapper to 
> ArrayDataSerializer/TypeSerializer/TypeSerializer.
> A exception will be thrown:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper cannot be 
> cast to org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer
>   at 
> org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter.write(BinaryWriter.java:134)
> at 
> org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator.generate(BinaryRecordDataGenerator.java:89)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35559) Shading issue cause class conflict

2024-06-11 Thread Hong Liang Teoh (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh resolved FLINK-35559.
-
Resolution: Fixed

> Shading issue cause class conflict
> --
>
> Key: FLINK-35559
> URL: https://issues.apache.org/jira/browse/FLINK-35559
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS, Connectors / Firehose, Connectors / 
> Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: aws-connector-4.4.0
>
> Attachments: Screenshot 2024-06-08 at 18.19.30.png
>
>
> Incorrect shading configuration causes ClassCastException during exception 
> handling when job package flink-connector-kinesis with 
> flink-connector-aws-kinesis-firehose.
> {code:java}
> java.lang.ClassCastException: class 
> software.amazon.awssdk.services.firehose.model.FirehoseException cannot be 
> cast to class 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  (software.amazon.awssdk.services.firehose.model.FirehoseException and 
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.exception.AwsServiceException
>  are in unnamed module of loader 'app')
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionClassifierUtil.lambda$withAWSServiceErrorCode$0(AWSExceptionClassifierUtil.java:62)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:45)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier.isFatal(FatalExceptionClassifier.java:51)
>  ~[flink-connector-base-1.19.0.jar:1.19.0]
>   at 
> org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler.consumeIfFatal(AWSExceptionHandler.java:53)
>  ~[flink-connector-kinesis-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.handleFullyFailedRequest(KinesisFirehoseSinkWriter.java:218)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkWriter.lambda$submitRequestEntries$0(KinesisFirehoseSinkWriter.java:189)
>  ~[flink-connector-aws-kinesis-firehose-4.3-SNAPSHOT.jar:4.3-SNAPSHOT]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>  ~[utils-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>  ~[sdk-core-2.20.144.jar:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>  ~[?:?]
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
>  ~[?:?]
>   at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java

Re: [PR] [FLINK-35540][cdc-common][cdc-connector][mysql] fix lost table when database and table are with the same name [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3396:
URL: https://github.com/apache/flink-cdc/pull/3396


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


GOODBOY008 opened a new pull request, #3408:
URL: https://github.com/apache/flink-cdc/pull/3408

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-35540:
---
Fix Version/s: cdc-3.2.0
   cdc-3.1.1

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-35540:
--

Assignee: linqigeng

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854012#comment-17854012
 ] 

Leonard Xu commented on FLINK-35540:


master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7

release-3.1: TODO

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


GOODBOY008 commented on PR #3408:
URL: https://github.com/apache/flink-cdc/pull/3408#issuecomment-2160480202

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.1][FLINK-35545][doc] Miss 3.1.0 version in snapshot flink-cdc doc version list [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3408:
URL: https://github.com/apache/flink-cdc/pull/3408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35545) Miss 3.1.0 version in snapshot flink-cdc doc version list

2024-06-11 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853114#comment-17853114
 ] 

Leonard Xu edited comment on FLINK-35545 at 6/11/24 11:22 AM:
--

master:4efb1d78ca778abeae142facfa99440f22a88b25

release-3.1:050c28649c0bd0068b5e7fe62331b257574572f2

release-3.0:a02b9dc019195ea317739a63738b0d5eaf1a6671


was (Author: leonard xu):
master:4efb1d78ca778abeae142facfa99440f22a88b25

release-3.1:93d5ee98da19bb878754bbc3780a3e23033ed331

release-3.0:a02b9dc019195ea317739a63738b0d5eaf1a6671

> Miss 3.1.0 version in snapshot flink-cdc doc version list
> -
>
> Key: FLINK-35545
> URL: https://issues.apache.org/jira/browse/FLINK-35545
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
> Attachments: image-2024-06-08-10-07-06-403.png, screenshot-1.png
>
>
> Link : [https://nightlies.apache.org/flink/flink-cdc-docs-master/]
> Miss 3.0.1 version in version list:
>  
> !screenshot-1.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3407:
URL: https://github.com/apache/flink-cdc/pull/3407


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-06-11 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852752#comment-17852752
 ] 

Leonard Xu edited comment on FLINK-34908 at 6/11/24 11:23 AM:
--

master: e2ccc836a056c16974e4956190bdce249705b7ee

3.1: 1112987572e487a79a1bbecf460705aa6153e0bb


was (Author: leonard xu):
master: e2ccc836a056c16974e4956190bdce249705b7ee

3.1: todo

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-06-11 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-34908.

Resolution: Fixed

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]

2024-06-11 Thread via GitHub


morazow commented on PR #24426:
URL: https://github.com/apache/flink/pull/24426#issuecomment-2160556166

   Thanks @HuangXingBo for the update!
   
   I have changed the GitHub actions to run the `auditwheel` repair after 
installing `patchelf` in Linux. Could you please have another look?
   
   You can find the build wheel artifacts for this change here: 
https://github.com/morazow/flink/actions/runs/9464510932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634722169


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {
+uncompactedName = uncompactedName.substring(1);
+}
+return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   I did not touched this method, I just moved the getWriterType above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {
+uncompactedName = uncompactedName.substring(1);
+}
+return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review Comment:
   I did not touch this method, I just moved the getWriterType above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {

Review Comment:
   I did not touched this method, I just moved the `getWriterType` above this 
to keep the method visibility order in the file intact, because that method 
became package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32562][connectors/filesystem] Handle compressed streams correctly in ConcatFileCompactor [flink]

2024-06-11 Thread via GitHub


ferenc-csaky commented on code in PR #24914:
URL: https://github.com/apache/flink/pull/24914#discussion_r1634721788


##
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java:
##
@@ -154,23 +158,27 @@ private List getCompactingPath(CompactorRequest 
request) throws IOExceptio
 return compactingFiles;
 }
 
-private static Path assembleCompactedFilePath(Path uncompactedPath) {
-String uncompactedName = uncompactedPath.getName();
-if (uncompactedName.startsWith(".")) {
-uncompactedName = uncompactedName.substring(1);
-}
-return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
-}
-
-private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+@VisibleForTesting
+static Type getWriterType(FileCompactor fileCompactor) {
 if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
-return CompactingFileWriter.Type.OUTPUT_STREAM;
+return fileCompactor instanceof ConcatFileCompactor
+&& ((ConcatFileCompactor) 
fileCompactor).isCompressed()
+? Type.COMPRESSED_STREAM
+: Type.OUTPUT_STREAM;
 } else if (fileCompactor instanceof RecordWiseFileCompactor) {
-return CompactingFileWriter.Type.RECORD_WISE;
+return Type.RECORD_WISE;
 } else {
 throw new UnsupportedOperationException(
 "Unable to crate compacting file writer for compactor:"
 + fileCompactor.getClass());
 }
 }
+
+private static Path assembleCompactedFilePath(Path uncompactedPath) {
+String uncompactedName = uncompactedPath.getName();
+if (uncompactedName.startsWith(".")) {

Review Comment:
   I did not touch this method, I just moved the `getWriterType` above this to 
keep the method visibility order in the file intact, because that method became 
package-private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35543][HIVE] Upgrade Hive 2.3 connector to version 2.3.10 [flink]

2024-06-11 Thread via GitHub


pan3793 commented on PR #24905:
URL: https://github.com/apache/flink/pull/24905#issuecomment-2160590805

   ping @JingGe, is there any thing I can do before merging?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]

2024-06-11 Thread via GitHub


qg-lin opened a new pull request, #3409:
URL: https://github.com/apache/flink-cdc/pull/3409

   https://issues.apache.org/jira/browse/FLINK-35540


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

Re: [PR] [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter [flink-connector-aws]

2024-06-11 Thread via GitHub


hlteoh37 commented on code in PR #136:
URL: 
https://github.com/apache/flink-connector-aws/pull/136#discussion_r1634758095


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.NumericTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
+import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.BeanAttributeGetter;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * A {@link ElementConverter} that converts an element to a {@link 
DynamoDbWriteRequest} using
+ * TypeInformation provided.
+ */
+@PublicEvolving
+public class DynamoDbTypeInformedElementConverter
+implements ElementConverter {
+
+private final CompositeType typeInfo;
+private final boolean ignoreNulls;
+private final TableSchema tableSchema;
+
+/**
+ * Creates a {@link DynamoDbTypeInformedElementConverter} that converts an 
element to a {@link
+ * DynamoDbWriteRequest} using the provided {@link CompositeType}. Usage: 
{@code new
+ * 
DynamoDbTypeInformedElementConverter<>(TypeInformation.of(MyPojoClass.class))}
+ *
+ * @param typeInfo The {@link CompositeType} that provides the type 
information for the element.
+ */
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo) {
+this(typeInfo, true);
+}
+
+public DynamoDbTypeInformedElementConverter(CompositeType typeInfo, 
boolean ignoreNulls) {
+
+try {
+this.typeInfo = typeInfo;
+this.ignoreNulls = ignoreNulls;
+this.tableSchema = createTableSchema(typeInfo);
+} catch (IntrospectionException | IllegalStateException | 
IllegalArgumentException e) {
+throw new FlinkRuntimeException("Failed to extract DynamoDb table 
schema", e);
+}
+}
+
+@Override
+public DynamoDbWriteRequest apply(T input, SinkWriter.Context context) {
+Preconditions.checkNotNull(tableSchema, "TableSchema is not 
initialized");
+try {
+return DynamoDbWriteRequest.builder()
+.setType(DynamoDbWriteRequestType.PUT)
+.setItem(tableSchema.itemToMap(input, ignoreNulls))
+.build();
+} catch (ClassCastExcept

[jira] [Updated] (FLINK-35541) Introduce retry limiting for AWS connector sinks

2024-06-11 Thread Aleksandr Pilipenko (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aleksandr Pilipenko updated FLINK-35541:

Affects Version/s: aws-connector-4.3.0

> Introduce retry limiting for AWS connector sinks
> 
>
> Key: FLINK-35541
> URL: https://issues.apache.org/jira/browse/FLINK-35541
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / AWS, Connectors / DynamoDB, Connectors / 
> Firehose, Connectors / Kinesis
>Affects Versions: aws-connector-4.2.0, aws-connector-4.3.0
>Reporter: Aleksandr Pilipenko
>Priority: Major
>
> Currently if the record write operation in the sink consistently fails with 
> retriable error, sinks will retry indefinitely. In case when cause of the 
> error is not resolved this may lead to poison pill.
>  
> Proposal here is to add a configurable retry limit for each record. Users can 
> specify a maximum retry per record, and the sink will fail once the retry 
> limit is reached.
>  
> We will implement this for all AWS connectors:
>  * DDBSink
>  * Firehose Sink
>  * Kinesis Sink
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-11 Thread via GitHub


gaborgsomogyi commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634845033


##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +71,38 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException
+| CertificateException
+| NoSuchAlgorithmException
+| KeyStoreException e) {
+throw new RemoteTransportException(

Review Comment:
   I've checked out the code and played with it. I agree that's what we can do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-11 Thread via GitHub


gaborgsomogyi commented on code in PR #24919:
URL: https://github.com/apache/flink/pull/24919#discussion_r1634861389


##
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java:
##
@@ -285,7 +291,14 @@ private static KeyManagerFactory getKeyManagerFactory(
 : SecurityOptions.SSL_REST_KEY_PASSWORD,
 SecurityOptions.SSL_KEY_PASSWORD);
 
-KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+// do not use getAndCheckOption here as there is no fallback option 
and a default is
+// specified
+String keystoreType =
+internal
+? 
config.get(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE)
+: config.get(SecurityOptions.SSL_REST_KEYSTORE_TYPE);

Review Comment:
   Just a clarification for other reviewers, since there is default value it 
just doesn't make sense to provide fallback.



##
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java:
##
@@ -59,13 +68,35 @@ public TrustManager[] trustManagers() {
 .fingerprints(sslCertFingerprints)
 .build();
 
-trustManagerFactory.init(loadKeystore(sslTrustStore, 
sslTrustStorePassword));
+trustManagerFactory.init(
+loadKeystore(sslTrustStore, sslTrustStorePassword, 
sslTrustStoreType));
 return trustManagerFactory.getTrustManagers();
-} catch (GeneralSecurityException e) {
+} catch (GeneralSecurityException | IOException e) {
 // replicate exception handling from SSLEngineProvider
 throw new RemoteTransportException(
 "Server SSL connection could not be established because 
SSL context could not be constructed",
 e);
 }
 }
+
+@Override
+public KeyStore loadKeystore(String filename, String password) {
+try {
+return loadKeystore(filename, password, sslKeyStoreType);
+} catch (IOException | GeneralSecurityException e) {
+throw new RemoteTransportException(
+"Server SSL connection could not be established because 
SSL context could not be constructed",

Review Comment:
   Here we can be more specific: "Server SSL connection could not be 
established because keystore could not be loaded"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35378) [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction

2024-06-11 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-35378:
---
Summary: [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate 
SinkFunction  (was: [FLIP-453] Promote Unified Sink API V2 to Public and 
Deprecate SinkFunc)

> [FLIP-453] Promote Unified Sink API V2 to Public and Deprecate SinkFunction
> ---
>
> Key: FLINK-35378
> URL: https://issues.apache.org/jira/browse/FLINK-35378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Core
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
>
> https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=303794871&draftShareId=af4ace88-98b7-4a53-aece-cd67d2f91a15&;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2024-06-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-12450:
---

Assignee: Kartikey Pant  (was: Ran Tao)

> [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table 
> API and SQL
> ---
>
> Key: FLINK-12450
> URL: https://issues.apache.org/jira/browse/FLINK-12450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Assignee: Kartikey Pant
>Priority: Major
>  Labels: auto-unassigned, stale-assigned
>
> BIT_LSHIFT, Shifts a long number to the left
> BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2024-06-11 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854054#comment-17854054
 ] 

lincoln lee commented on FLINK-12450:
-

Thank you everyone! Assigned to you [~kartikeypant]. 

> [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table 
> API and SQL
> ---
>
> Key: FLINK-12450
> URL: https://issues.apache.org/jira/browse/FLINK-12450
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Zhanchun Zhang
>Assignee: Kartikey Pant
>Priority: Major
>  Labels: auto-unassigned, stale-assigned
>
> BIT_LSHIFT, Shifts a long number to the left
> BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26951) Add HASH supported in SQL & Table API

2024-06-11 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854060#comment-17854060
 ] 

lincoln lee commented on FLINK-26951:
-

Thanks [~kartikeypant] for volunteering this!

Before implementation, we should make it clear that which hash algorithm will 
be used. This hash function is mainly for hive compatibility purpose, so we 
need to clarify the details to ensure it is compatible with hive.

> Add HASH supported in SQL & Table API
> -
>
> Key: FLINK-26951
> URL: https://issues.apache.org/jira/browse/FLINK-26951
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>
> Returns a hash value of the arguments.
> Syntax:
> {code:java}
> hash(expr1, ...) {code}
> Arguments:
>  * {{{}exprN{}}}: An expression of any type.
> Returns:
> An INTEGER.
> Examples:
> {code:java}
> > SELECT hash('Flink', array(123), 2);
>  -1321691492 {code}
> See more:
>  * [Spark|https://spark.apache.org/docs/latest/api/sql/index.html#hash]
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]

2024-06-11 Thread via GitHub


LadyForest closed pull request #24889: [Flink-35473][table] Improve Table/SQL 
Configuration for Flink 2.0
URL: https://github.com/apache/flink/pull/24889


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35263) FLIP-446: Kubernetes Operator State Snapshot CRD

2024-06-11 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-35263:
-
Fix Version/s: (was: kubernetes-operator-1.9.0)

> FLIP-446: Kubernetes Operator State Snapshot CRD
> 
>
> Key: FLINK-35263
> URL: https://issues.apache.org/jira/browse/FLINK-35263
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
>
> Described in 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-446%3A+Kubernetes+Operator+State+Snapshot+CRD]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35265) Implement FlinkStateSnapshot custom resource

2024-06-11 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-35265:
-
Fix Version/s: (was: kubernetes-operator-1.9.0)

> Implement FlinkStateSnapshot custom resource
> 
>
> Key: FLINK-35265
> URL: https://issues.apache.org/jira/browse/FLINK-35265
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35267) Create documentation for FlinkStateSnapshot CR

2024-06-11 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-35267:
-
Fix Version/s: (was: kubernetes-operator-1.9.0)

> Create documentation for FlinkStateSnapshot CR
> --
>
> Key: FLINK-35267
> URL: https://issues.apache.org/jira/browse/FLINK-35267
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
>
> This should cover the new features and migration from the now deprecated 
> methods of taking snapshots.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-06-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan resolved FLINK-35473.
---
Resolution: Fixed

Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, 
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8, 
fbacf22a057e52c06a10988c308dfb31afbbcb12, 
6dbe7bf5c306551836ec89c70f9aaab317f55e10, 
526f9b034763fd022a52fe84b2c3227c59a78df1

> FLIP-457: Improve Table/SQL Configuration for Flink 2.0
> ---
>
> Key: FLINK-35473
> URL: https://issues.apache.org/jira/browse/FLINK-35473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This is the parent task for 
> [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35266) Add e2e tests for FlinkStateSnapshot CRs

2024-06-11 Thread Mate Czagany (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mate Czagany updated FLINK-35266:
-
Fix Version/s: (was: kubernetes-operator-1.9.0)

> Add e2e tests for FlinkStateSnapshot CRs
> 
>
> Key: FLINK-35266
> URL: https://issues.apache.org/jira/browse/FLINK-35266
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Reporter: Mate Czagany
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-06-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854068#comment-17854068
 ] 

Jane Chan edited comment on FLINK-35473 at 6/11/24 2:10 PM:


Fixed in master
93d49ff6eb9f61cb2450d0b25732f4d8923b840d,
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8,
fbacf22a057e52c06a10988c308dfb31afbbcb12,
6dbe7bf5c306551836ec89c70f9aaab317f55e10,
526f9b034763fd022a52fe84b2c3227c59a78df1


was (Author: qingyue):
Fixed in master 93d49ff6eb9f61cb2450d0b25732f4d8923b840d, 
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8, 
fbacf22a057e52c06a10988c308dfb31afbbcb12, 
6dbe7bf5c306551836ec89c70f9aaab317f55e10, 
526f9b034763fd022a52fe84b2c3227c59a78df1

> FLIP-457: Improve Table/SQL Configuration for Flink 2.0
> ---
>
> Key: FLINK-35473
> URL: https://issues.apache.org/jira/browse/FLINK-35473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This is the parent task for 
> [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-06-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854068#comment-17854068
 ] 

Jane Chan edited comment on FLINK-35473 at 6/11/24 2:10 PM:


Fixed in master
93d49ff6eb9f61cb2450d0b25732f4d8923b840d,
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8,
fbacf22a057e52c06a10988c308dfb31afbbcb12,
6dbe7bf5c306551836ec89c70f9aaab317f55e10,
526f9b034763fd022a52fe84b2c3227c59a78df1


was (Author: qingyue):
Fixed in master
93d49ff6eb9f61cb2450d0b25732f4d8923b840d,
b7b1fc2c29995135b9005f07e385986a40c65621, 
fbf0f28fef737d47b45815d3f77c6a842167c3e8,
fbacf22a057e52c06a10988c308dfb31afbbcb12,
6dbe7bf5c306551836ec89c70f9aaab317f55e10,
526f9b034763fd022a52fe84b2c3227c59a78df1

> FLIP-457: Improve Table/SQL Configuration for Flink 2.0
> ---
>
> Key: FLINK-35473
> URL: https://issues.apache.org/jira/browse/FLINK-35473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This is the parent task for 
> [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35473) FLIP-457: Improve Table/SQL Configuration for Flink 2.0

2024-06-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan closed FLINK-35473.
-

> FLIP-457: Improve Table/SQL Configuration for Flink 2.0
> ---
>
> Key: FLINK-35473
> URL: https://issues.apache.org/jira/browse/FLINK-35473
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> This is the parent task for 
> [FLIP-457|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=307136992].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35116) Upgrade JOSDK dependency to 4.8.3

2024-06-11 Thread Mate Czagany (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854084#comment-17854084
 ] 

Mate Czagany commented on FLINK-35116:
--

I think this can be closed as the PR has already been merged to main 
4ec4b319c2ba9927c32372c595017840768a67ee

> Upgrade JOSDK dependency to 4.8.3
> -
>
> Key: FLINK-35116
> URL: https://issues.apache.org/jira/browse/FLINK-35116
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Márton Balassi
>Assignee: Márton Balassi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> This bring a much needed fix for the operator HA behaviour:
> https://github.com/operator-framework/java-operator-sdk/releases/tag/v4.8.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-11 Thread Pouria Pirzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141
 ] 

Pouria Pirzadeh commented on FLINK-20539:
-

[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue (See 
below).
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.


```

Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);

tableEnv.sqlQuery("SELECT * FROM t1").execute().print();

```

Here is the error:
```

Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[*anonymous_datastream_source$1*, 
metadata=[rowtime]]])

```

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT NULL buyer_name) NOT 
> NULL
> rel:
> LogicalProject(order_number=[$0], price=[$1], first_name=[$2], 
> last_name=[$3], buyer_name=[ROW($2, $3)])
>   LogicalTableScan(table=[[default_catalog, default_database, Orders]])
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-11 Thread Pouria Pirzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141
 ] 

Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:08 PM:
--

[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue (See 
below).
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}


was (Author: pouria):
[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue (See 
below).
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.


```

Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);

tableEnv.sqlQuery("SELECT * FROM t1").execute().print();

```

Here is the error:
```

Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[*anonymous_datastream_source$1*, 
metadata=[rowtime]]])

```

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE

[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-11 Thread Pouria Pirzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141
 ] 

Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:08 PM:
--

[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue.
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}


was (Author: pouria):
[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue (See 
below).
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE"

[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-11 Thread Pouria Pirzadeh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854141#comment-17854141
 ] 

Pouria Pirzadeh edited comment on FLINK-20539 at 6/11/24 6:09 PM:
--

[~xuyangzhong]   [~martijnvisser] Thanks for looking into the issue and merging 
the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue.
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}


was (Author: pouria):
[~martijnvisser] Thanks for looking into the issue and merging the fix.

I tried below query on 1.19 and I am still hitting (a similar) issue.
I need to use CAST so I can create a Row with named fields.

The query works fine without CAST, so I believe the same issue (i.e. Data type 
mismatch between Calcite and Flink's type factory) is happening here too.
{code:java}
Table t1 = tableEnv.sqlQuery(
            "SELECT "
                + "CAST(ROW(name, price) AS ROW) AS col  "
                + "FROM orders");
tableEnv.createTemporaryView("t1", t1);
tableEnv.sqlQuery("SELECT * FROM t1").execute().print(); {code}
Here is the error:
{code:java}
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" name_val, BIGINT price_val) NOT NULL col) NOT NULL
converted type:
RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" name_val, 
BIGINT price_val) NOT NULL col) NOT NULL
rel:
LogicalProject(col=[$0])
  LogicalProject(col=[CAST(ROW($1, $2)):RecordType(VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" name_val, BIGINT price_val) NOT NULL])
    LogicalProject(ts=[$2], name=[$0], price=[$1], rowtime=[$3])
      LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$3])
        LogicalProject(name=[$0], price=[$1], ts=[$2], 
rowtime=[CAST($3):TIMESTAMP_LTZ(3) *ROWTIME*])
          LogicalTableScan(table=[[anonymous_datastream_source$1, 
metadata=[rowtime]]]) {code}

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-

[jira] [Closed] (FLINK-35116) Upgrade JOSDK dependency to 4.8.3

2024-06-11 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-35116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi closed FLINK-35116.
--
Resolution: Fixed

Thanks [~mateczagany] ! Indeed.

> Upgrade JOSDK dependency to 4.8.3
> -
>
> Key: FLINK-35116
> URL: https://issues.apache.org/jira/browse/FLINK-35116
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Márton Balassi
>Assignee: Márton Balassi
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> This bring a much needed fix for the operator HA behaviour:
> https://github.com/operator-framework/java-operator-sdk/releases/tag/v4.8.3



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Adding Opensearch Connector v2.0.0 [flink-web]

2024-06-11 Thread via GitHub


snuyanzin merged PR #741:
URL: https://github.com/apache/flink-web/pull/741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adding Opensearch Connector v1.2.0 [flink-web]

2024-06-11 Thread via GitHub


snuyanzin merged PR #740:
URL: https://github.com/apache/flink-web/pull/740


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Enable ci for v1.2 [flink-connector-opensearch]

2024-06-11 Thread via GitHub


snuyanzin merged PR #47:
URL: https://github.com/apache/flink-connector-opensearch/pull/47


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35140) Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19

2024-06-11 Thread Sergey Nuyanzin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-35140:

Summary: Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 
1.19  (was: Release flink-connector-opensearch vX.X.X for Flink 1.19)

> Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19
> 
>
> Key: FLINK-35140
> URL: https://issues.apache.org/jira/browse/FLINK-35140
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Opensearch
>Reporter: Danny Cranmer
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> [https://github.com/apache/flink-connector-opensearch]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]

2024-06-11 Thread via GitHub


snuyanzin opened a new pull request, #24921:
URL: https://github.com/apache/flink/pull/24921

   
   ## What is the purpose of the change
   Add docs for new Opensearch connector release
   In fact docs are same for both v1 and v2, so just referencing to one branch 
(also references to multiple branches are not supported here IIRC)
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: ( no )
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no )
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35140) Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19

2024-06-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35140:
---
Labels: pull-request-available  (was: )

> Release flink-connector-opensearch v1.2.0 and v.2.0.0 for Flink 1.19
> 
>
> Key: FLINK-35140
> URL: https://issues.apache.org/jira/browse/FLINK-35140
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Opensearch
>Reporter: Danny Cranmer
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> [https://github.com/apache/flink-connector-opensearch]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35140][docs] Update Opensearch connector docs for 1.2.0 and 2.0.0 releases [flink]

2024-06-11 Thread via GitHub


flinkbot commented on PR #24921:
URL: https://github.com/apache/flink/pull/24921#issuecomment-2161796100

   
   ## CI report:
   
   * 84a66fe40a34e3e74b68beee473005b903a69fe3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35242) Add per-type schema evolution behavior configuration

2024-06-11 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-35242:

Description: 
> Update: Changed `fine grained` terminology to avoid confusion between 
> fine-grained job resource management feature.

Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
throwing an exception. However such configuration strategy doesn't cover all 
user cases and requires want more fine-grained strategy configuration.

This ticket suggests adding one more strategy "try_evolve" or 
"evolve_when_available". It's basically like "evolving" option, but doesn't 
throw an exception if such operation fails, which provides more flexibility.

Also, this ticket suggests allowing user to configure per-schema-event 
strategy, so users could evolve some types of event (like rename column) and 
reject some unwelcomed events (like truncate table, remove column).

  was:
Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
throwing an exception. However such configuration strategy doesn't cover all 
user cases and requires want more fine-grained strategy configuration.

This ticket suggests adding one more strategy "try_evolve" or 
"evolve_when_available". It's basically like "evolving" option, but doesn't 
throw an exception if such operation fails, which provides more flexibility.

Also, this ticket suggests allowing user to configure per-schema-event 
strategy, so users could evolve some types of event (like rename column) and 
reject some dangerous events (like truncate table, remove column).

Summary: Add per-type schema evolution behavior configuration  (was: 
Add fine-grained schema evolution strategy)

> Add per-type schema evolution behavior configuration
> 
>
> Key: FLINK-35242
> URL: https://issues.apache.org/jira/browse/FLINK-35242
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> > Update: Changed `fine grained` terminology to avoid confusion between 
> > fine-grained job resource management feature.
> Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
> throwing an exception. However such configuration strategy doesn't cover all 
> user cases and requires want more fine-grained strategy configuration.
> This ticket suggests adding one more strategy "try_evolve" or 
> "evolve_when_available". It's basically like "evolving" option, but doesn't 
> throw an exception if such operation fails, which provides more flexibility.
> Also, this ticket suggests allowing user to configure per-schema-event 
> strategy, so users could evolve some types of event (like rename column) and 
> reject some unwelcomed events (like truncate table, remove column).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-25537] [JUnit5 Migration] Module: flink-core with,Package: core [flink]

2024-06-11 Thread via GitHub


1996fanrui commented on PR #24881:
URL: https://github.com/apache/flink/pull/24881#issuecomment-2161888308

   > Thanks @GOODBOY008 for the quick update, LGTM now. Hi @1996fanrui, could 
you help double check this PR?
   
   Thank @Jiabao-Sun for the ping, I will check it these 2 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35540][cdc-common] Fix table missed when database and table are with the same name in release-3.1.1 [flink-cdc]

2024-06-11 Thread via GitHub


leonardBang merged PR #3409:
URL: https://github.com/apache/flink-cdc/pull/3409


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854012#comment-17854012
 ] 

Leonard Xu edited comment on FLINK-35540 at 6/12/24 1:51 AM:
-

master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7

release-3.1: 7287eaceca29d105bf5a7c74d75945a42a051016


was (Author: leonard xu):
master: c958daf8ab417d16fc7fc0b59f341e8f9cf372e7

release-3.1: TODO

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35540) flink-cdc-pipeline-connector-mysql lost table which database and table with the same name

2024-06-11 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-35540.

Resolution: Fixed

> flink-cdc-pipeline-connector-mysql lost table which database and table with 
> the same name
> -
>
> Key: FLINK-35540
> URL: https://issues.apache.org/jira/browse/FLINK-35540
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: linqigeng
>Assignee: linqigeng
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0, cdc-3.1.1
>
>
> h1. Description
> When the parameter of 'tables' in mysql pipeline job contains a table which 
> database and table are with the same name like 'app.app', the job will fail 
> and the error meaasge is like:
> {code:java}
> java.lang.IllegalArgumentException: Cannot find any table by the option 
> 'tables' = app.app {code}
> h1. How to reproduce
> Create database and table all named `{{{}app`{}}}, then submit a pipeline job 
> like this YAML defined:
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3306
>   username: root
>   password: 123456
>   tables: app.app
>   server-id: 5400-5404
>   server-time-zone: UTC
> sink:
>   type: doris
>   fenodes: 127.0.0.1:8030
>   username: root
>   password: ""
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1pipeline:
>   name: Sync MySQL Database to Doris
>   parallelism: 2 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35568) Add the imagePullSecrets property for FlinkDeployment spec

2024-06-11 Thread Gang Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gang Huang updated FLINK-35568:
---
Summary: Add the imagePullSecrets property for FlinkDeployment spec  (was: 
Add imagePullSecrets for FlinkDeployment spec)

> Add the imagePullSecrets property for FlinkDeployment spec
> --
>
> Key: FLINK-35568
> URL: https://issues.apache.org/jira/browse/FLINK-35568
> Project: Flink
>  Issue Type: Improvement
>Reporter: Gang Huang
>Priority: Blocker
>
> I am confused that how to configure imagePullSecrets for a private dockerhub 
> website, since there maybe are no related parameters found in the official 
> docs 
> (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35568) Add imagePullSecrets for FlinkDeployment spec

2024-06-11 Thread Gang Huang (Jira)
Gang Huang created FLINK-35568:
--

 Summary: Add imagePullSecrets for FlinkDeployment spec
 Key: FLINK-35568
 URL: https://issues.apache.org/jira/browse/FLINK-35568
 Project: Flink
  Issue Type: Improvement
Reporter: Gang Huang


I am confused that how to configure imagePullSecrets for a private dockerhub 
website, since there maybe are no related parameters found in the official docs 
(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-20217][task] Allow certains operators to yield to unaligned checkpoint in case timers are firing [flink]

2024-06-11 Thread via GitHub


1996fanrui commented on PR #24895:
URL: https://github.com/apache/flink/pull/24895#issuecomment-2161973417

   It seems one test related to timer fails, it maybe caused by this PR.
   
   
`StreamTaskCancellationTest.testCancelTaskShouldPreventAdditionalEventTimeTimersFromBeingFired:272->testCancelTaskShouldPreventAdditionalTimersFromBeingFired:300
 » IllegalState`
   
   
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60198&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=10477
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35234][hotfix][cdc-common] Fix NullPointerException of org.apache.flink.cdc.common.configuration.ConfigurationUtils#convertToString [flink-cdc]

2024-06-11 Thread via GitHub


Jiabao-Sun commented on code in PR #3255:
URL: https://github.com/apache/flink-cdc/pull/3255#discussion_r1635733171


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/ConfigurationUtils.java:
##
@@ -130,7 +130,9 @@ static Duration convertToDuration(Object o) {
 }
 
 static String convertToString(Object o) {
-if (o.getClass() == String.class) {
+if (o == null) {

Review Comment:
   The inner null check seems redundant now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34545][cdc-pipeline-connector]Add OceanBase pipeline connector to Flink CDC [flink-cdc]

2024-06-11 Thread via GitHub


yuxiqian commented on PR #3360:
URL: https://github.com/apache/flink-cdc/pull/3360#issuecomment-2161996630

   Hi @yuanoOo, thanks for your great contribution! Since @whhe is familiar 
with OceanBase, do you have time to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Jane Chan (Jira)
Jane Chan created FLINK-35569:
-

 Summary: 
SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
 failed
 Key: FLINK-35569
 URL: https://issues.apache.org/jira/browse/FLINK-35569
 Project: Flink
  Issue Type: Bug
  Components: Build System / Azure Pipelines, Build System / CI
Affects Versions: 1.20.0
Reporter: Jane Chan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-35569:
--
Description: 
[https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]

The parameterized test is failed when RestoreMode is "CLAIM" and 
fileMergingAcrossBoundary is false.

> SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
>  failed
> --
>
> Key: FLINK-35569
> URL: https://issues.apache.org/jira/browse/FLINK-35569
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Build System / CI
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Priority: Major
>
> [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]
> The parameterized test is failed when RestoreMode is "CLAIM" and 
> fileMergingAcrossBoundary is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854224#comment-17854224
 ] 

Jane Chan commented on FLINK-35569:
---

Hi [~Zakelly], would you mind sparing some time to take a look?

> SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
>  failed
> --
>
> Key: FLINK-35569
> URL: https://issues.apache.org/jira/browse/FLINK-35569
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Build System / CI
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Priority: Major
>
> [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]
> The parameterized test is failed when RestoreMode is "CLAIM" and 
> fileMergingAcrossBoundary is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-35569:
---

Assignee: Zakelly Lan

> SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
>  failed
> --
>
> Key: FLINK-35569
> URL: https://issues.apache.org/jira/browse/FLINK-35569
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Build System / CI
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Zakelly Lan
>Priority: Major
>
> [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]
> The parameterized test is failed when RestoreMode is "CLAIM" and 
> fileMergingAcrossBoundary is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35569) SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging failed

2024-06-11 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854225#comment-17854225
 ] 

Zakelly Lan commented on FLINK-35569:
-

Thanks [~qingyue] , I'll take a look.

> SnapshotFileMergingCompatibilityITCase#testSwitchFromEnablingToDisablingFileMerging
>  failed
> --
>
> Key: FLINK-35569
> URL: https://issues.apache.org/jira/browse/FLINK-35569
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / Azure Pipelines, Build System / CI
>Affects Versions: 1.20.0
>Reporter: Jane Chan
>Assignee: Zakelly Lan
>Priority: Major
>
> [https://github.com/apache/flink/actions/runs/9467135511/job/26081097181]
> The parameterized test is failed when RestoreMode is "CLAIM" and 
> fileMergingAcrossBoundary is false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-32091][checkpoint] Add file size metrics for file-merging [flink]

2024-06-11 Thread via GitHub


fredia opened a new pull request, #24922:
URL: https://github.com/apache/flink/pull/24922

   
   
   ## What is the purpose of the change
   
   Add file size and file count metrics for file-merging.
   
   
   ## Brief change log
   - Add `logicalFileCount`, `logicalFileSize`, `physicalFileCount`, 
`physicalFileSize` metrics.
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change added tests and can be verified as follows:
   - `FileMergingMetricsTest`

   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32091) Add necessary metrics for file-merging

2024-06-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32091:
---
Labels: pull-request-available  (was: )

> Add necessary metrics for file-merging
> --
>
> Key: FLINK-32091
> URL: https://issues.apache.org/jira/browse/FLINK-32091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >