hawk9821 commented on code in PR #9665:
URL: https://github.com/apache/seatunnel/pull/9665#discussion_r2264687283


##########
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/commit/PaimonAggregatedCommitter.java:
##########
@@ -47,57 +47,94 @@ public class PaimonAggregatedCommitter
 
     private static final long serialVersionUID = 1L;
 
-    private final WriteBuilder tableWriteBuilder;
+    private final FileStoreTable table;
 
     public PaimonAggregatedCommitter(
             Table table, PaimonHadoopConfiguration paimonHadoopConfiguration) {
-        this.tableWriteBuilder = table.newStreamWriteBuilder();
+        this.table = (FileStoreTable) table;
         PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
     }
 
     @Override
     public List<PaimonAggregatedCommitInfo> commit(
             List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
-        try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
+        aggregatedCommitInfo.stream()
+                
.collect(Collectors.groupingBy(PaimonAggregatedCommitInfo::getCommitUser))
+                .forEach(this::commit);
+        return Collections.emptyList();
+    }
+
+    private void commit(String commitUser, List<PaimonAggregatedCommitInfo> 
aggregatedCommitInfo) {
+        try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
             PaimonSecurityContext.runSecured(
                     () -> {
                         log.debug("Trying to commit states streaming mode");
-                        aggregatedCommitInfo.stream()
-                                .flatMap(
-                                        paimonAggregatedCommitInfo ->
-                                                
paimonAggregatedCommitInfo.getCommittablesMap()
-                                                        .entrySet().stream())
-                                .forEach(
-                                        entry ->
-                                                ((StreamTableCommit) 
tableCommit)
-                                                        
.commit(entry.getKey(), entry.getValue()));
+                        Map<Long, List<CommitMessage>> committablesMap =
+                                aggregatedCommitInfo.stream()
+                                        .flatMap(
+                                                paimonAggregatedCommitInfo ->
+                                                        
paimonAggregatedCommitInfo
+                                                                
.getCommittablesMap().entrySet()
+                                                                .stream())
+                                        .collect(
+                                                Collectors.toMap(
+                                                        Map.Entry::getKey, 
Map.Entry::getValue));
+                        if (!committablesMap.isEmpty()) {
+                            tableCommit.filterAndCommit(committablesMap);
+                        }
                         return null;
                     });
         } catch (Exception e) {
             throw new PaimonConnectorException(
-                    PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED,
-                    "Paimon table storage write-commit Failed.",
-                    e);
+                    PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
         }
-        return Collections.emptyList();
     }
 
     @Override
     public PaimonAggregatedCommitInfo combine(List<PaimonCommitInfo> 
commitInfos) {
-        Map<Long, List<CommitMessage>> committables = new HashMap<>();
+        String commitUser = commitInfos.get(0).getCommitUser();
+        Map<Long, List<CommitMessage>> commitTables = new HashMap<>();
         commitInfos.forEach(
                 commitInfo ->
-                        committables
+                        commitTables
                                 .computeIfAbsent(
                                         commitInfo.getCheckpointId(),
                                         id -> new CopyOnWriteArrayList<>())
                                 .addAll(commitInfo.getCommittables()));
-        return new PaimonAggregatedCommitInfo(committables);
+        return new PaimonAggregatedCommitInfo(commitTables, commitUser);
     }
 
     @Override
     public void abort(List<PaimonAggregatedCommitInfo> aggregatedCommitInfo) 
throws Exception {
-        // TODO find the right way to abort
+        aggregatedCommitInfo.stream()
+                
.collect(Collectors.groupingBy(PaimonAggregatedCommitInfo::getCommitUser))
+                .forEach(this::abort);
+    }
+
+    private void abort(String commitUser, List<PaimonAggregatedCommitInfo> 
aggregatedCommitInfo) {
+        try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
+            PaimonSecurityContext.runSecured(
+                    () -> {
+                        log.debug("Trying to commit states streaming mode");
+                        Map<Long, List<CommitMessage>> committablesMap =
+                                aggregatedCommitInfo.stream()
+                                        .flatMap(
+                                                paimonAggregatedCommitInfo ->
+                                                        
paimonAggregatedCommitInfo
+                                                                
.getCommittablesMap().entrySet()
+                                                                .stream())
+                                        .collect(
+                                                Collectors.toMap(
+                                                        Map.Entry::getKey, 
Map.Entry::getValue));
+                        if (!committablesMap.isEmpty()) {
+                            committablesMap.forEach(tableCommit::commit);

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to