This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 0360e7e51 [Bug][Connector-V2][Doris] update last checkpoint id when doing snapshot (#4881) 0360e7e51 is described below commit 0360e7e5185bd016458a20d3b597b4ccee83dfeb Author: gnehil <adamlee...@gmail.com> AuthorDate: Sun Jun 4 23:15:32 2023 +0800 [Bug][Connector-V2][Doris] update last checkpoint id when doing snapshot (#4881) --- .../seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index ac0927f08..876a12f77 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -59,7 +59,7 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo private static final int CONNECT_TIMEOUT = 1000; private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT)); - private final long lastCheckpointId; + private long lastCheckpointId; private DorisStreamLoad dorisStreamLoad; volatile boolean loading; private final DorisConfig dorisConfig; @@ -156,7 +156,8 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo this.dorisStreamLoad.setHostPort(getAvailableBackend()); this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1)); this.loading = true; - return Collections.singletonList(dorisSinkState); + this.lastCheckpointId = checkpointId; + return Collections.singletonList(new DorisSinkState(labelPrefix, lastCheckpointId)); } @Override