kwonder0926 opened a new issue, #49968:
URL: https://github.com/apache/doris/issues/49968

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Version
   
   doris-2.1.8
   flink-1.17.2
   mongoDB-4.4.18
   
   ### What's Wrong?
   
   I am using Flink to real-time sync data from MongoDB to Doris. For a sync 
latency test, I created a new Doris table with the following DDL:
   
   ```sql
   CREATE TABLE test_sync_perf_test (
       id VARCHAR(36) NOT NULL,
       source_timestamp DATETIME(3),
       flink_write_time DATETIME(3),
       doris_current_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
       device_id INT,
       sensor_code VARCHAR(24),
       temperature DECIMAL(5,2),
       humidity DECIMAL(5,2),
       pressure DECIMAL(7,2),
       status TINYINT,
       voltage FLOAT,
       `current` FLOAT,
       gps_lng DECIMAL(10,6),
       gps_lat DECIMAL(10,6),
       firmware_ver VARCHAR(16),
       error_code SMALLINT,
       data_quality TINYINT,
       checksum BIGINT,
       rand_val INT,
       reserved1 VARCHAR(32),
       reserved2 BIGINT
   )
   UNIQUE KEY(id)
   DISTRIBUTED BY HASH(id) BUCKETS 3
   PROPERTIES (
       "replication_num" = "1",
       "storage_format" = "V2",
       "enable_unique_key_merge_on_write" = "true"
   );
   ```
   
   Among these, three fields are primary, while the rest are for testing 
purposes:
   
   - **source_timestamp**: MongoDB data write time
   - **flink_write_time**: Time after processing by flink-doris-connector
   - **doris_current_time**: Doris data ingestion time, which is supposed to be 
automatically captured upon data entry.
   I used the following Java code to generate and automatically insert data 
into MongoDB:
   
   ```java
   
   public class MongoDataGenerator {
   
       // 参数默认值
       private static final int DEFAULT_TOTAL = 1_000_000;
       private static final int DEFAULT_BATCH_SIZE = 500;
       private static final int DEFAULT_RATE = 5000; // 条/秒
       private static final int DEFAULT_THREADS = 1;
   
       private static int incr = 0;
   
       public static void main(String[] args) {
           // 解析命令行参数
           Map<String, String> params = parseArguments(args);
           if (params.containsKey("-h")) {
               printHelp();
               return;
           }
   
           // 获取参数值
           int total = getIntParam(params, "--total", DEFAULT_TOTAL);
           int batchSize = getIntParam(params, "--batch-size", 
DEFAULT_BATCH_SIZE);
           int ratePerSecond = getIntParam(params, "--rate", DEFAULT_RATE);
           int threads = getIntParam(params, "--threads", DEFAULT_THREADS);
   
           validateParameters(total, batchSize, ratePerSecond, threads);
   
           System.out.printf("启动数据生成器 [总量=%d] [批次大小=%d] [限速=%d条/秒] [线程数=%d]%n",
                   total, batchSize, ratePerSecond, threads);
   
           // 计算实际需要的批次数量
           int totalBatches = (int) Math.ceil((double) total / batchSize);
           long delayMicroseconds = (long) (1_000_000.0 / (ratePerSecond / 
(double) batchSize));
   
           ExecutorService executor = Executors.newFixedThreadPool(threads);
           MongoClient client = createMongoClient();
           MongoCollection<Document> collection = client.getDatabase("xlsms")
                   .getCollection("test_sync_perf_test");
   
           AtomicLong counter = new AtomicLong(0);
           long startTime = System.currentTimeMillis();
           Random random = new Random();
   
           // 创建定时线程池用于速率控制
           ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
   
           // 进度监控任务
           scheduler.scheduleAtFixedRate(() -> {
               long count = counter.get();
               if (count >= total) return;
   
               long elapsed = System.currentTimeMillis() - startTime;
               System.out.printf("进度: %d/%d (%.1f%%) 当前速率: %.2f 条/秒%n",
                       count, total,
                       (count * 100.0 / total),
                       (count * 1000.0 / (elapsed == 0 ? 1 : elapsed)));
           }, 1, 1, TimeUnit.SECONDS);
   
           // 数据生成任务
           for (int i = 0; i < totalBatches; i++) {
               final int currentBatch = i;
               scheduler.schedule(() -> {
                   int actualBatchSize = Math.min(batchSize, total - 
(currentBatch * batchSize));
                   if (actualBatchSize <= 0) return;
   
                   List<Document> batch = generateBatch(actualBatchSize, 
random);
                   executor.submit(() -> {
                       collection.insertMany(batch);
                       long count = counter.addAndGet(actualBatchSize);
                   });
               }, i * delayMicroseconds, TimeUnit.MICROSECONDS);
           }
   
           // 关闭资源
           scheduler.schedule(() -> {
               executor.shutdown();
               try {
                   if (!executor.awaitTermination(1, TimeUnit.HOURS)) {
                       System.err.println("任务未能在指定时间内完成");
                   }
               } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
               }
               client.close();
               System.out.println("数据生成完成");
               scheduler.shutdown();
           }, totalBatches * delayMicroseconds, TimeUnit.MICROSECONDS);
       }
   
       private static Map<String, String> parseArguments(String[] args) {
           Map<String, String> params = new HashMap<>();
           for (int i = 0; i < args.length; i++) {
               if (args[i].startsWith("--")) {
                   if (i + 1 < args.length && !args[i + 1].startsWith("-")) {
                       params.put(args[i], args[i + 1]);
                       i++;
                   } else {
                       params.put(args[i], "");
                   }
               } else if (args[i].equals("-h")) {
                   params.put("-h", "");
               }
           }
           return params;
       }
   
       private static int getIntParam(Map<String, String> params, String key, 
int defaultValue) {
           return params.containsKey(key) ? Integer.parseInt(params.get(key)) : 
defaultValue;
       }
   
       private static void validateParameters(int total, int batchSize, int 
rate, int threads) {
           if (total <= 0) throw new IllegalArgumentException("总数据量必须大于0");
           if (batchSize <= 0) throw new IllegalArgumentException("批次大小必须大于0");
           if (rate <= 0) throw new IllegalArgumentException("速率限制必须大于0");
           if (threads <= 0) throw new IllegalArgumentException("线程数必须大于0");
           if (batchSize > rate) 
System.err.println("警告:批次大小超过速率限制,实际速率可能无法达到限制值");
       }
   
       private static MongoClient createMongoClient() {
           return 
MongoClients.create("mongodb://xinling:2&pxrpWBFeEkCPu9@10.250.250.11:27017," +
                   
"10.250.250.143:27017,10.250.250.218:27017/?replicaSet=rep1");
   //        return 
MongoClients.create("mongodb://stuser:stpw@192.168.10.102:27017," +
   //                
"192.168.10.103:27017,192.168.10.104:27017/?replicaSet=rs0");
   
       }
   
       private static List<Document> generateBatch(int batchSize, Random 
random) {
           ZonedDateTime beijingTime = 
ZonedDateTime.now(ZoneId.of("Asia/Shanghai"));
           List<Document> batch = new ArrayList<>(batchSize);
           for (int j = 0; j < batchSize; j++) {
               Document doc = new Document()
                       .append("_id", new ObjectId())
                       .append("id", incr)
                       .append("device_id", 
ThreadLocalRandom.current().nextInt(1, 1000))
                       .append("sensor_code", "SENSOR-" + String.format("%04d", 
j))
                       .append("temperature", BigDecimal.valueOf(20 + 
ThreadLocalRandom.current().nextDouble(15)))
                       .append("source_timestamp", 
Date.from(beijingTime.toInstant()))
                       .append("humidity", 30 + random.nextDouble() * 30)
                       .append("pressure", 1000 + random.nextDouble() * 50)
                       .append("status", random.nextInt(2))
                       .append("voltage", 3 + random.nextDouble() * 2)
                       .append("current", random.nextDouble())
                       .append("gps_lng", 116 + random.nextDouble() * 2)
                       .append("gps_lat", 39 + random.nextDouble() * 2)
                       .append("firmware_ver", "v" + (random.nextInt(5) + 1) + 
"." + (random.nextInt(10)) + "." + (random.nextInt(10)))
                       .append("error_code", random.nextInt(5))
                       .append("data_quality", random.nextInt(3))
                       .append("checksum", random.nextLong())
                       .append("rand_val", random.nextInt(100))
                       .append("reserved1", "backup-" + random.nextInt(100))
                       .append("reserved2", random.nextLong());
   //            System.out.println(incr + ":" + doc);
               incr += 1;
               batch.add(doc);
           }
           return batch;
       }
   
       private static void printHelp() {
           System.out.println("MongoDB 压力测试数据生成器");
           System.out.println("参数说明:");
           System.out.println("  --total <number>      总数据量(默认:" + 
DEFAULT_TOTAL + ")");
           System.out.println("  --batch-size <number> 每批次写入量(默认:" + 
DEFAULT_BATCH_SIZE + ")");
           System.out.println("  --rate <number>       最大写入速率(条/秒,默认:" + 
DEFAULT_RATE + ")");
           System.out.println("  --threads <number>    写入线程数(默认:" + 
DEFAULT_THREADS + ")");
           System.out.println("  -h                    显示帮助信息");
       }
   }
   ```
   
   However, after all my data was synced to Doris, I noticed that for many 
records, **doris_current_time** was earlier than **source_timestamp**, with the 
discrepancy approaching Flink’s checkpoint_interval in seconds.
   
   I can assure that the time difference among my MongoDB, Flink, and Doris 
servers does not exceed 1 second. Upon reviewing MongoDB’s oplog, it appears 
the issue does not stem from MongoDB, and **flink_write_time** values seem 
normal, indicating typical latency. Therefore, I suspect the issue lies within 
Doris.
   
   Could this be a bug or a specific mechanism of Doris? Please advise.
   
   
![Image](https://github.com/user-attachments/assets/58d7c78b-04ad-47b8-b60f-4e02b23fb314)
   
   ### What You Expected?
   
   I hope to make this time more reasonable, at least later than my 
source_timestamp, or I want to know why this happens, at least to rule out that 
it’s a problem with Doris.
   
   ### How to Reproduce?
   
   Here is the Flink sync command I used:
   
   ```bash
   ./bin/flink run \
       -Dexecution.checkpointing.interval=5s \
       -Dparallelism.default=2 \
       -Dpipeline.operator-chaining=false \
       -c org.apache.doris.flink.tools.cdc.CdcTools \
       ./lib/flink-doris-connector-1.17-25.1.0-SNAPSHOT.jar \
       mongodb-sync-database \
       --job-name mongo2Doris \
       --database xlsms \
       --table-prefix db_ \
       --schema-change-mode sql_parser \
       --use-new-schema-change true\
       --mongodb-conf 
hosts=192.168.10.102:27017,192.168.10.103:27017,192.168.10.104:27017 \
       --mongodb-conf username=stuser \
       --mongodb-conf password=stpw \
       --mongodb-conf database=xlsms \
       --mongodb-conf scan.startup.mode=initial \
       --mongodb-conf schema.sample-percent=0.2 \
       --including-tables ".*" \
       --sink-conf fenodes=192.168.10.102:8030 \
       --sink-conf username=root \
       --sink-conf password=123456 \
       --sink-conf jdbc-url=jdbc:mysql://192.168.10.102:9030 \
       --sink-conf sink.label-prefix=sms \
       --sink-conf sink.enable-2pc=false \
       --single-sink true \
       --table-conf replication_num=1
   ```
   
   ### Anything Else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to