wq557520 opened a new issue, #9074:
URL: https://github.com/apache/seatunnel/issues/9074

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   我从kafka消费canal数据,写入starrocks,用bin/kafka-consumer-groups.sh   
--bootstrap-server node:9092   --group xxxxxxxxxxx  --describe   
看游标,连续查看几次,会出现多个值,实际从指定位置消费,但是从游标看lag没有堆积。为了确认这个问题,我还同时slink到本地文件,文件一直在写入历史数据,游标显示始终对不上
   
   ### SeaTunnel Version
   
   2.3.9
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 300000
     checkpoint.timeout: 300000
     read_limit.rows_per_second=1000
   }
   
   source {
     Kafka {
      plugin_output = "gaea_mysql_cdc"
       
      schema = {
        fields {
           Id = "bigint"
           OrderNo = "string"
           BankNo = "string"
           ThirdOrderNo = "string"
           OrderType = "bigint"
           PaymentMethod = "smallint"
           PayBank = "bigint"
        
          
           }
       }
   
   
       format = canal_json
       database.include = ".*"
       table.include = "^payorder.*"
       start_mode = "timestamp"
       start_mode.timestamp = 1743132600000
       topic = "SProd.gaea-pay"
       bootstrap.servers = "node132:9092"
       consumer.group = "seatunnel9"
   
   
       kafka.config = {
         client.id = client_1
         max.poll.records = 100
         auto.offset.reset = "earliest"
         enable.auto.commit = "false"
       }
     }
   }
   
   transform {
     Metadata {
       plugin_input = "gaea_mysql_cdc"
       plugin_output = "add_table_name_output"
       metadata_fields {
          Table = __tablename
       }
     }
   
   
    Sql {
       plugin_input = "add_table_name_output"
       plugin_output = "table_name_split_output"
       query = "select *,SUBSTRING(REGEXP_REPLACE(__tablename, '[^0-9]', ''), 
1, 6) as __month,SUBSTRING(REGEXP_REPLACE(__tablename, '[^0-9]', ''), 7) as 
__orgid from add_table_name_output"
   
     }
   }
   
   
   
   sink {
    
   LocalFile {
       plugin_input = "table_name_split_output"
       path = "/root/seatunnel/prod/canal/text/${table_name}"
       tmp_path = "/root/seatunnel/prod/canal/tmp"
       file_format_type = "text"
       enable_header_write = true
       encoding = "utf-8"
   }
    StarRocks {
      plugin_input = "table_name_split_output"
   
     base-url = "jdbc:mysql://192.168.31:9030/reports"
     nodeUrls = ["192.1631:8030","192.16832:8030","192.10.133:8030"]
     database = "prod_reports"
    # 如果没有设置该值,则表名与上游表名相同
     table = "order"
     username = "brod"
     password = "bi3"
   
       # 批量写入配置
     batch_max_rows = 100
     batch_max_bytes = 5242880
   # 是否开启upsert/delete事件的同步,仅仅支持主键模型的表
     enable_upsert_delete = true
       
      }
   
   }
   ```
   
   ### Running Command
   
   ```shell
   /opt/seatunnel/bin/seatunnel.sh --config 
/root/seatunnel/prod/canal/xxxx.config  -from nothgin --async -n xxxxxxxx
   ```
   
   ### Error Exception
   
   ```log
   游标显示不对
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to