wq557520 opened a new issue, #9074: URL: https://github.com/apache/seatunnel/issues/9074
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 我从kafka消费canal数据,写入starrocks,用bin/kafka-consumer-groups.sh --bootstrap-server node:9092 --group xxxxxxxxxxx --describe 看游标,连续查看几次,会出现多个值,实际从指定位置消费,但是从游标看lag没有堆积。为了确认这个问题,我还同时slink到本地文件,文件一直在写入历史数据,游标显示始终对不上 ### SeaTunnel Version 2.3.9 ### SeaTunnel Config ```conf env { execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 300000 checkpoint.timeout: 300000 read_limit.rows_per_second=1000 } source { Kafka { plugin_output = "gaea_mysql_cdc" schema = { fields { Id = "bigint" OrderNo = "string" BankNo = "string" ThirdOrderNo = "string" OrderType = "bigint" PaymentMethod = "smallint" PayBank = "bigint" } } format = canal_json database.include = ".*" table.include = "^payorder.*" start_mode = "timestamp" start_mode.timestamp = 1743132600000 topic = "SProd.gaea-pay" bootstrap.servers = "node132:9092" consumer.group = "seatunnel9" kafka.config = { client.id = client_1 max.poll.records = 100 auto.offset.reset = "earliest" enable.auto.commit = "false" } } } transform { Metadata { plugin_input = "gaea_mysql_cdc" plugin_output = "add_table_name_output" metadata_fields { Table = __tablename } } Sql { plugin_input = "add_table_name_output" plugin_output = "table_name_split_output" query = "select *,SUBSTRING(REGEXP_REPLACE(__tablename, '[^0-9]', ''), 1, 6) as __month,SUBSTRING(REGEXP_REPLACE(__tablename, '[^0-9]', ''), 7) as __orgid from add_table_name_output" } } sink { LocalFile { plugin_input = "table_name_split_output" path = "/root/seatunnel/prod/canal/text/${table_name}" tmp_path = "/root/seatunnel/prod/canal/tmp" file_format_type = "text" enable_header_write = true encoding = "utf-8" } StarRocks { plugin_input = "table_name_split_output" base-url = "jdbc:mysql://192.168.31:9030/reports" nodeUrls = ["192.1631:8030","192.16832:8030","192.10.133:8030"] database = "prod_reports" # 如果没有设置该值,则表名与上游表名相同 table = "order" username = "brod" password = "bi3" # 批量写入配置 batch_max_rows = 100 batch_max_bytes = 5242880 # 是否开启upsert/delete事件的同步,仅仅支持主键模型的表 enable_upsert_delete = true } } ``` ### Running Command ```shell /opt/seatunnel/bin/seatunnel.sh --config /root/seatunnel/prod/canal/xxxx.config -from nothgin --async -n xxxxxxxx ``` ### Error Exception ```log 游标显示不对 ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org