kwonder0926 opened a new issue, #49968: URL: https://github.com/apache/doris/issues/49968
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no similar issues. ### Version doris-2.1.8 flink-1.17.2 mongoDB-4.4.18 ### What's Wrong? I am using Flink to real-time sync data from MongoDB to Doris. For a sync latency test, I created a new Doris table with the following DDL: ```sql CREATE TABLE test_sync_perf_test ( id VARCHAR(36) NOT NULL, source_timestamp DATETIME(3), flink_write_time DATETIME(3), doris_current_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3), device_id INT, sensor_code VARCHAR(24), temperature DECIMAL(5,2), humidity DECIMAL(5,2), pressure DECIMAL(7,2), status TINYINT, voltage FLOAT, `current` FLOAT, gps_lng DECIMAL(10,6), gps_lat DECIMAL(10,6), firmware_ver VARCHAR(16), error_code SMALLINT, data_quality TINYINT, checksum BIGINT, rand_val INT, reserved1 VARCHAR(32), reserved2 BIGINT ) UNIQUE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES ( "replication_num" = "1", "storage_format" = "V2", "enable_unique_key_merge_on_write" = "true" ); ``` Among these, three fields are primary, while the rest are for testing purposes: - **source_timestamp**: MongoDB data write time - **flink_write_time**: Time after processing by flink-doris-connector - **doris_current_time**: Doris data ingestion time, which is supposed to be automatically captured upon data entry. I used the following Java code to generate and automatically insert data into MongoDB: ```java public class MongoDataGenerator { // 参数默认值 private static final int DEFAULT_TOTAL = 1_000_000; private static final int DEFAULT_BATCH_SIZE = 500; private static final int DEFAULT_RATE = 5000; // 条/秒 private static final int DEFAULT_THREADS = 1; private static int incr = 0; public static void main(String[] args) { // 解析命令行参数 Map<String, String> params = parseArguments(args); if (params.containsKey("-h")) { printHelp(); return; } // 获取参数值 int total = getIntParam(params, "--total", DEFAULT_TOTAL); int batchSize = getIntParam(params, "--batch-size", DEFAULT_BATCH_SIZE); int ratePerSecond = getIntParam(params, "--rate", DEFAULT_RATE); int threads = getIntParam(params, "--threads", DEFAULT_THREADS); validateParameters(total, batchSize, ratePerSecond, threads); System.out.printf("启动数据生成器 [总量=%d] [批次大小=%d] [限速=%d条/秒] [线程数=%d]%n", total, batchSize, ratePerSecond, threads); // 计算实际需要的批次数量 int totalBatches = (int) Math.ceil((double) total / batchSize); long delayMicroseconds = (long) (1_000_000.0 / (ratePerSecond / (double) batchSize)); ExecutorService executor = Executors.newFixedThreadPool(threads); MongoClient client = createMongoClient(); MongoCollection<Document> collection = client.getDatabase("xlsms") .getCollection("test_sync_perf_test"); AtomicLong counter = new AtomicLong(0); long startTime = System.currentTimeMillis(); Random random = new Random(); // 创建定时线程池用于速率控制 ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); // 进度监控任务 scheduler.scheduleAtFixedRate(() -> { long count = counter.get(); if (count >= total) return; long elapsed = System.currentTimeMillis() - startTime; System.out.printf("进度: %d/%d (%.1f%%) 当前速率: %.2f 条/秒%n", count, total, (count * 100.0 / total), (count * 1000.0 / (elapsed == 0 ? 1 : elapsed))); }, 1, 1, TimeUnit.SECONDS); // 数据生成任务 for (int i = 0; i < totalBatches; i++) { final int currentBatch = i; scheduler.schedule(() -> { int actualBatchSize = Math.min(batchSize, total - (currentBatch * batchSize)); if (actualBatchSize <= 0) return; List<Document> batch = generateBatch(actualBatchSize, random); executor.submit(() -> { collection.insertMany(batch); long count = counter.addAndGet(actualBatchSize); }); }, i * delayMicroseconds, TimeUnit.MICROSECONDS); } // 关闭资源 scheduler.schedule(() -> { executor.shutdown(); try { if (!executor.awaitTermination(1, TimeUnit.HOURS)) { System.err.println("任务未能在指定时间内完成"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } client.close(); System.out.println("数据生成完成"); scheduler.shutdown(); }, totalBatches * delayMicroseconds, TimeUnit.MICROSECONDS); } private static Map<String, String> parseArguments(String[] args) { Map<String, String> params = new HashMap<>(); for (int i = 0; i < args.length; i++) { if (args[i].startsWith("--")) { if (i + 1 < args.length && !args[i + 1].startsWith("-")) { params.put(args[i], args[i + 1]); i++; } else { params.put(args[i], ""); } } else if (args[i].equals("-h")) { params.put("-h", ""); } } return params; } private static int getIntParam(Map<String, String> params, String key, int defaultValue) { return params.containsKey(key) ? Integer.parseInt(params.get(key)) : defaultValue; } private static void validateParameters(int total, int batchSize, int rate, int threads) { if (total <= 0) throw new IllegalArgumentException("总数据量必须大于0"); if (batchSize <= 0) throw new IllegalArgumentException("批次大小必须大于0"); if (rate <= 0) throw new IllegalArgumentException("速率限制必须大于0"); if (threads <= 0) throw new IllegalArgumentException("线程数必须大于0"); if (batchSize > rate) System.err.println("警告:批次大小超过速率限制,实际速率可能无法达到限制值"); } private static MongoClient createMongoClient() { return MongoClients.create("mongodb://xinling:2&pxrpWBFeEkCPu9@10.250.250.11:27017," + "10.250.250.143:27017,10.250.250.218:27017/?replicaSet=rep1"); // return MongoClients.create("mongodb://stuser:stpw@192.168.10.102:27017," + // "192.168.10.103:27017,192.168.10.104:27017/?replicaSet=rs0"); } private static List<Document> generateBatch(int batchSize, Random random) { ZonedDateTime beijingTime = ZonedDateTime.now(ZoneId.of("Asia/Shanghai")); List<Document> batch = new ArrayList<>(batchSize); for (int j = 0; j < batchSize; j++) { Document doc = new Document() .append("_id", new ObjectId()) .append("id", incr) .append("device_id", ThreadLocalRandom.current().nextInt(1, 1000)) .append("sensor_code", "SENSOR-" + String.format("%04d", j)) .append("temperature", BigDecimal.valueOf(20 + ThreadLocalRandom.current().nextDouble(15))) .append("source_timestamp", Date.from(beijingTime.toInstant())) .append("humidity", 30 + random.nextDouble() * 30) .append("pressure", 1000 + random.nextDouble() * 50) .append("status", random.nextInt(2)) .append("voltage", 3 + random.nextDouble() * 2) .append("current", random.nextDouble()) .append("gps_lng", 116 + random.nextDouble() * 2) .append("gps_lat", 39 + random.nextDouble() * 2) .append("firmware_ver", "v" + (random.nextInt(5) + 1) + "." + (random.nextInt(10)) + "." + (random.nextInt(10))) .append("error_code", random.nextInt(5)) .append("data_quality", random.nextInt(3)) .append("checksum", random.nextLong()) .append("rand_val", random.nextInt(100)) .append("reserved1", "backup-" + random.nextInt(100)) .append("reserved2", random.nextLong()); // System.out.println(incr + ":" + doc); incr += 1; batch.add(doc); } return batch; } private static void printHelp() { System.out.println("MongoDB 压力测试数据生成器"); System.out.println("参数说明:"); System.out.println(" --total <number> 总数据量(默认:" + DEFAULT_TOTAL + ")"); System.out.println(" --batch-size <number> 每批次写入量(默认:" + DEFAULT_BATCH_SIZE + ")"); System.out.println(" --rate <number> 最大写入速率(条/秒,默认:" + DEFAULT_RATE + ")"); System.out.println(" --threads <number> 写入线程数(默认:" + DEFAULT_THREADS + ")"); System.out.println(" -h 显示帮助信息"); } } ``` However, after all my data was synced to Doris, I noticed that for many records, **doris_current_time** was earlier than **source_timestamp**, with the discrepancy approaching Flink’s checkpoint_interval in seconds. I can assure that the time difference among my MongoDB, Flink, and Doris servers does not exceed 1 second. Upon reviewing MongoDB’s oplog, it appears the issue does not stem from MongoDB, and **flink_write_time** values seem normal, indicating typical latency. Therefore, I suspect the issue lies within Doris. Could this be a bug or a specific mechanism of Doris? Please advise.  ### What You Expected? I hope to make this time more reasonable, at least later than my source_timestamp, or I want to know why this happens, at least to rule out that it’s a problem with Doris. ### How to Reproduce? Here is the Flink sync command I used: ```bash ./bin/flink run \ -Dexecution.checkpointing.interval=5s \ -Dparallelism.default=2 \ -Dpipeline.operator-chaining=false \ -c org.apache.doris.flink.tools.cdc.CdcTools \ ./lib/flink-doris-connector-1.17-25.1.0-SNAPSHOT.jar \ mongodb-sync-database \ --job-name mongo2Doris \ --database xlsms \ --table-prefix db_ \ --schema-change-mode sql_parser \ --use-new-schema-change true\ --mongodb-conf hosts=192.168.10.102:27017,192.168.10.103:27017,192.168.10.104:27017 \ --mongodb-conf username=stuser \ --mongodb-conf password=stpw \ --mongodb-conf database=xlsms \ --mongodb-conf scan.startup.mode=initial \ --mongodb-conf schema.sample-percent=0.2 \ --including-tables ".*" \ --sink-conf fenodes=192.168.10.102:8030 \ --sink-conf username=root \ --sink-conf password=123456 \ --sink-conf jdbc-url=jdbc:mysql://192.168.10.102:9030 \ --sink-conf sink.label-prefix=sms \ --sink-conf sink.enable-2pc=false \ --single-sink true \ --table-conf replication_num=1 ``` ### Anything Else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org