Hisoka-X commented on code in PR #9735:
URL: https://github.com/apache/seatunnel/pull/9735#discussion_r2289752634
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/offset/BinlogOffsetFactory.java:
##########
@@ -76,6 +76,7 @@ public Offset specific(String filename, Long position) {
@Override
public Offset timestamp(long timestamp) {
- throw new UnsupportedOperationException("not supported create new
Offset by timestamp.");
+ // mysql binlog timestamp is second, so we need to divide 1000
Review Comment:
milliseconds?
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java:
##########
@@ -185,4 +192,87 @@ private static Map<String, String> querySystemVariables(
return variables;
}
+
+ public static BinlogOffset findBinlogOffsetBytimestamp(
+ JdbcConnection jdbc, BinaryLogClient client, long timestamp) {
+ final String showBinaryLogStmt = "SHOW BINARY LOGS";
Review Comment:
cc @hawk9821 . After this PR merged we should update this for 8.4?
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlConnectionUtils.java:
##########
@@ -185,4 +192,87 @@ private static Map<String, String> querySystemVariables(
return variables;
}
+
+ public static BinlogOffset findBinlogOffsetBytimestamp(
+ JdbcConnection jdbc, BinaryLogClient client, long timestamp) {
+ final String showBinaryLogStmt = "SHOW BINARY LOGS";
+ List<String> binlogFiles = new ArrayList<>();
+ JdbcConnection.ResultSetConsumer rsc =
+ rs -> {
+ while (rs.next()) {
+ String fileName = rs.getString(1);
+ long fileSize = rs.getLong(2);
+ if (fileSize > 0) {
+ binlogFiles.add(fileName);
+ }
+ }
+ };
+ try {
+ jdbc.query(showBinaryLogStmt, rsc);
+ if (binlogFiles.isEmpty()) {
+ return BinlogOffset.INITIAL_OFFSET;
+ }
+ String binlogName = searchBinlogName(client, timestamp,
binlogFiles);
+ return new BinlogOffset(binlogName, 0);
+ } catch (Exception e) {
+ throw new SeaTunnelException(e);
+ }
+ }
+
+ private static String searchBinlogName(
+ BinaryLogClient client, long targetMs, List<String> binlogFiles)
+ throws IOException, InterruptedException {
+ int startIdx = 0;
+ int endIdx = binlogFiles.size() - 1;
+
+ while (startIdx <= endIdx) {
+ int mid = startIdx + (endIdx - startIdx) / 2;
+ long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
+ if (midTs < targetMs) {
+ startIdx = mid + 1;
+ } else if (targetMs < midTs) {
+ endIdx = mid - 1;
+ } else {
+ return binlogFiles.get(mid);
+ }
+ }
+
+ return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
+ }
+
+ public static long getBinlogTimestamp(BinaryLogClient client, String
binlogFile)
+ throws IOException, InterruptedException {
+
+ ArrayBlockingQueue<Long> binlogTimestamps = new
ArrayBlockingQueue<>(1);
Review Comment:
why we still use queue?
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java:
##########
@@ -204,6 +229,81 @@ public static BinlogOffset getBinlogPosition(Map<String,
?> offset) {
}
}
+ private class TimestampFilterMySqlStreamingChangeEventSource
+ extends MySqlStreamingChangeEventSource {
+
+ private final Long targetTimestamp;
+ private long logTimestamp;
+ private boolean loggedWaitingMessage;
+ private final long LOG_INTERVAL_MS = 10000;
+
+ public TimestampFilterMySqlStreamingChangeEventSource(
+ MySqlConnectorConfig connectorConfig,
+ MySqlConnection connection,
+ JdbcSourceEventDispatcher<MySqlPartition> dispatcher,
+ ErrorHandler errorHandler,
+ Clock clock,
+ MySqlTaskContext taskContext,
+ MySqlStreamingChangeEventSourceMetrics metrics,
+ Long targetTimestamp) {
+ super(
+ connectorConfig,
+ connection,
+ dispatcher,
+ errorHandler,
+ clock,
+ taskContext,
+ metrics);
+ this.targetTimestamp = targetTimestamp;
+ }
+
+ @Override
+ protected void handleEvent(
+ MySqlPartition partition, MySqlOffsetContext offsetContext,
Event event) {
+ if (event == null) {
+ super.handleEvent(partition, offsetContext, event);
+ return;
+ }
+
+ long eventTs = event.getHeader().getTimestamp();
+ if (eventTs == 0 || targetTimestamp == null || targetTimestamp ==
0) {
+ super.handleEvent(partition, offsetContext, event);
+ return;
+ }
+ boolean shouldSkip = eventTs < targetTimestamp;
+ if (shouldSkip) {
+ if (!loggedWaitingMessage) {
+ log.info(
+ "skip binlog, currentTime:{}, filterTime:{}",
eventTs, targetTimestamp);
+ loggedWaitingMessage = true;
+ logTimestamp = eventTs;
+ }
+ if (eventTs - logTimestamp >= LOG_INTERVAL_MS) {
+ loggedWaitingMessage = false;
+ }
+ updateOffsetPosition(offsetContext, event.getHeader());
+ return;
+ }
+
+ super.handleEvent(partition, offsetContext, event);
+ }
+
+ private void updateOffsetPosition(
Review Comment:
Could you explain why we must update `MySqlOffsetContext` when skip data?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]