davidzollo commented on issue #8388: URL: https://github.com/apache/seatunnel/issues/8388#issuecomment-2673812869
Streaming task from Kafka to Doris ``` env { execution.parallelism = 4 # It is recommended to adjust according to the number of Kafka partitions, keeping it consistent with the partition count job.mode = "STREAMING" checkpoint.interval = 30000 checkpoint.timeout = 600000 # Your current rate limits seem high but reasonable, ~700MB/s read_limit.bytes_per_second=700000000 read_limit.rows_per_second=40000 } source { Kafka { result_table_name = "kafka_log" #Kafka server address bootstrap.servers = "xxxxx" topic = "xxxx" consumer.group = "kafka2table" start_mode = "earliest" kafka.config = { "fetch.min.bytes" = "1048576" # 1MB, increase the minimum batch fetch size "fetch.max.wait.ms" = "500" # Wait time when data is insufficient "max.partition.fetch.bytes" = "5242880" # 5MB, maximum data fetch per partition "max.poll.records" = "5000" # Maximum number of records per poll "isolation.level" = "read_committed" # Ensure data consistency } format = json schema={ fields={ ev=STRING pg=STRING uuid=STRING userId=bigint fromDevice=STRING ip=STRING source=STRING np=STRING lp=STRING tg=STRING ch=STRING v=STRING nt=STRING wifi=STRING dbd=STRING dmd=STRING bs=STRING browser_version=STRING ext=STRING sid=STRING timestamp=bigint reporttime=bigint } } } } transform { Sql { source_table_name = "kafka_log" result_table_name = "log" query = "select ev as event,pg as page,uuid,userId as userid,fromDevice as platform,ip,source,np as nextpage,lp as lastpage,tg as target,ch as channel,v as version,nt as network,wifi,dbd as device_brand,dmd as device_model,bs as browser,browser_version,ext as extra,sid as sessionid,timestamp,reporttime,CURRENT_DATE as dt from kafka_log" } } sink { Doris { source_table_name = "log" fenodes = "dxxx" username = xxx password = "xxxx" table.identifier = "ods.ods_log" sink.label-prefix = "log" sink.enable-2pc = "false" doris.batch.size = 500000 sink.buffer-size = 104857600 sink.max-retries = 5 doris.config { format="json" read_json_by_line="true" } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org