[PR] [INLONG-10575][Sort] Make mysql source support report audit information exactly once [inlong]

2024-07-06 Thread via GitHub


XiaoYou201 opened a new pull request, #10576:
URL: https://github.com/apache/inlong/pull/10576

   ### [INLONG-10575][Sort] Make mysql source support report audit information 
exactly once
   
   Fixes #10575
   
   ### Modifications
   1. Using the checkpoint principle in Flink to modify the process, the 
modified flow chart is shown in Figure 1 and Figure 2. In Figure 1, the 
callback method of notifyCompleteCheckpoint is used to upload audit information 
instead of scheduled upload.Each Source/Sink will save the checkpointId of the 
currently ongoing checkpoint, which is nowCheckpointId in the figure. When the 
audit information is written to the Buffer, nowCheckpointId will be attached, 
indicating that the audit information
   is written during this ongoing checkpoint. The audit information and 
checkpointId are in a many-to-one relationship.
   
   
![image](https://github.com/apache/inlong/assets/58425449/2f09c6c6-e9c2-4383-bca7-965333ccab65)
   When a snapshot request is received, the current operator's nowCheckpointId 
is updated to (snapshot) checkpointId + 1. When all operators in a task 
complete the snapshot, the notifyCompleteCheckpoint method is called back.
   
   At this time, AuditBuffer uploads audit information less than or equal to 
checkpointId (parameters in the notifyCompleteCheckpoint method).
   
   
![image](https://github.com/apache/inlong/assets/58425449/33d2de48-df21-49ce-96c7-d044d25f37b0)
   
   2. The getCurConsumedPartitions method gets the partitions assigned to the 
client by the tube server, including the partitions where the client has 
consumed data and the client has not consumed data. According to the previous 
logic, the offsets of the partitions that have not been consumed are not 
recorded. Here, the offsets of the partitions that have not been consumed are 
added.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Umbrella] InLong Transform feature [inlong]

2024-07-06 Thread via GitHub


github-actions[bot] commented on issue #10022:
URL: https://github.com/apache/inlong/issues/10022#issuecomment-2212246344

   This issue is stale because it has been open for 60 days with no activity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [INLONG-10568][Sort] Change the starrocks connector UNKNOWN data type handle method [inlong]

2024-07-06 Thread via GitHub


EMsnap merged PR #10570:
URL: https://github.com/apache/inlong/pull/10570


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(inlong) branch master updated: [INLONG-10568][Sort] Change the starrocks connector UNKNOWN datatype handle method (#10570)

2024-07-06 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new e08510 [INLONG-10568][Sort] Change the starrocks connector UNKNOWN 
datatype handle method (#10570)
e08510 is described below

commit e085107f2c4fb48918df7f5004f76be807e0
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Sun Jul 7 12:17:18 2024 +0800

[INLONG-10568][Sort] Change the starrocks connector UNKNOWN datatype handle 
method (#10570)
---
 .../sink/table/StarRocksDynamicTableSink.java  |   1 -
 .../table/sink/table/StarRocksSinkOP.java  |  42 +++
 .../sink/table/StarRocksTableRowTransformer.java   | 320 +
 3 files changed, 362 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
index c5df06a3ab..418e99c4df 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.starrocks.table.sink.table;
 
-import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
 import 
com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase;
 import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java
new file mode 100644
index 00..d37d7b0209
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksSinkOP.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import org.apache.flink.types.RowKind;
+
+/**
+ * StarRocks sink operator.
+ * copy from {@link com.starrocks.connector.flink.row.sink.StarRocksSinkOP}
+ * not modified
+ */
+public enum StarRocksSinkOP {
+
+UPSERT, DELETE;
+
+public static final String COLUMN_KEY = "__op";
+
+static StarRocksSinkOP parse(RowKind kind) {
+if (RowKind.INSERT.equals(kind) || RowKind.UPDATE_AFTER.equals(kind)) {
+return UPSERT;
+}
+if (RowKind.DELETE.equals(kind) || RowKind.UPDATE_BEFORE.equals(kind)) 
{
+return DELETE;
+}
+throw new RuntimeException("Unsupported row kind.");
+}
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java
new file mode 100644
index 00..6b4586a630
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksTableRowTransformer.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not u